1
0

Compare commits

..

72 Commits

Author SHA1 Message Date
Olivier Wilkinson (reivilibre)
a344ad3d3f Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-28 09:33:03 +01:00
reivilibre
b9f1adc370 Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-28 09:01:25 +01:00
Olivier Wilkinson (reivilibre)
1af7866562 Clean up code with improved naming and hoist around functions.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 16:16:53 +01:00
Olivier Wilkinson (reivilibre)
324f21b216 Fix logic error.
`absolute_fields` being None shouldn't preclude completion of a current
stats row.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:53:45 +01:00
Olivier Wilkinson (reivilibre)
62b1250629 Update _purge_room_txn to take account of separated stats tables
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:56 +01:00
Olivier Wilkinson (reivilibre)
11c4e506bd Rename room_state table to room_stats_state
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:24:25 +01:00
Olivier Wilkinson (reivilibre)
491eaf0808 Remove obsolete OldCollectionRequired as old collection is obsolete.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 14:19:38 +01:00
Olivier Wilkinson (reivilibre)
c775f310e9 Don't include the room & user stats docs in this PR.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:51:25 +01:00
Olivier Wilkinson (reivilibre)
09cbc3a8e9 Switch to milliseconds in room/user stats for consistency.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:50:58 +01:00
Olivier Wilkinson (reivilibre)
736ac58e11 Code formatting (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:42:33 +01:00
Olivier Wilkinson (reivilibre)
a6c102009e Lock tables in upsert fall-backs.
Should not be too much of a performance concern as this code won't be
hit on Postgres, which large deployments should be using.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:41:48 +01:00
Olivier Wilkinson (reivilibre)
544ba2c2e9 Apply minor suggestions from review
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:40:00 +01:00
Olivier Wilkinson (reivilibre)
81c5289c83 Clarify _update_stats_delta_txn by adding code comments and kwargs.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 13:35:37 +01:00
reivilibre
4b7bf2e413 Apply suggestions from code review
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-27 13:26:08 +01:00
Olivier Wilkinson (reivilibre)
5043ef801a Merge branch 'rei/rss_target' into rei/rss_inc2 2019-08-27 11:56:34 +01:00
Olivier Wilkinson (reivilibre)
baeaf00a12 Merge branch 'develop' into rei/rss_target 2019-08-27 11:55:27 +01:00
reivilibre
322ccac33f Allow schema deltas to be engine-specific (#5911)
* Allow schema deltas to be engine-specific

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Code style (Black)

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 11:53:21 +01:00
Richard van der Hoff
ccb15a5bbe Merge pull request #5906 from matrix-org/neilj/increase_display_name_limit
Increase profile display name limit
2019-08-27 11:52:59 +01:00
Erik Johnston
f5b50d0871 Merge pull request #5895 from matrix-org/erikj/notary_key
Add config option to sign remote key query responses with a separate key.
2019-08-27 11:51:37 +01:00
Richard van der Hoff
e7577427c9 Update 5909.misc 2019-08-27 11:50:52 +01:00
Richard van der Hoff
7837a5f2ea Merge pull request #5909 from aaronraimist/public_base_url
public_base_url is actually public_baseurl
2019-08-27 11:49:59 +01:00
reivilibre
1a7e6eb633 Add Admin API capability to set adminship of a user (#5878)
Admin API: Set adminship of a user
2019-08-27 10:14:00 +01:00
Olivier Wilkinson (reivilibre)
1ecd1a6a5f Use engine-specific delta SQL files rather than delta written in Python.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:50:49 +01:00
Olivier Wilkinson (reivilibre)
d1e0b91083 Code style (Black)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:39:11 +01:00
Olivier Wilkinson (reivilibre)
62a1639287 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:36:12 +01:00
Olivier Wilkinson (reivilibre)
aefa76f5cd Allow schema deltas to be engine-specific
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 09:14:00 +01:00
Olivier Wilkinson (reivilibre)
c3d2bf2807 Allow schema deltas to be engine-specific
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-27 08:52:20 +01:00
Aaron Raimist
c25137a99f Add changelog
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-08-26 21:06:10 -05:00
Aaron Raimist
e8e3e033ee public_base_url is actually public_baseurl
Signed-off-by: Aaron Raimist <aaron@raim.ist>
2019-08-26 21:01:56 -05:00
Neil Johnson
27d3fc421a Increase max display name limit 2019-08-24 22:33:43 +01:00
Erik Johnston
fe0ac98e66 Don't implicitly include server signing key 2019-08-23 15:36:28 +01:00
Erik Johnston
7af5a63063 Fixup review comments 2019-08-23 15:36:28 +01:00
Jorik Schellekens
8767b63a82 Propagate opentracing contexts through EDUs (#5852)
Propagate opentracing contexts through EDUs
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
2019-08-22 18:21:10 +01:00
Richard van der Hoff
0b39fa53b6 Merge pull request #5877 from Awesome-Technologies/remove_shared_secret_registration
Remove shared secret registration
2019-08-22 18:12:25 +01:00
Jorik Schellekens
812ed6b0d5 Opentracing across workers (#5771)
Propagate opentracing contexts across workers


Also includes some Convenience modifications to opentracing for servlets, notably:
- Add boolean to skip the whitelisting check on inject
  extract methods. - useful when injecting into carriers
  locally. Otherwise we'd always have to include our
  own servername and whitelist our servername
- start_active_span_from_request instead of header
- Add boolean to decide whether to extract context
  from a request to a servlet
2019-08-22 18:08:07 +01:00
Manuel Stahl
0bab582fd6 Remove shared secret registration from client/r0/register endpoint
This type of registration was probably never used. It only includes the
user name in the HMAC but not the password.

Shared secret registration is still available via
client/r0/admin/register.

Signed-off-by: Manuel Stahl <manuel.stahl@awesome-technologies.de>
2019-08-22 18:04:08 +02:00
Olivier Wilkinson (reivilibre)
79252d1c83 Fix up historical stats support.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:32 +01:00
Olivier Wilkinson (reivilibre)
e8fc180d4d Fix up SQL schema delta
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 16:10:05 +01:00
Olivier Wilkinson (reivilibre)
7b657f1148 Simplify table structure
This obviates the need for old collection, but comes at the minor cost
of not being able to track historical stats or per-slice fields until
after the statistics regenerator is finished.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-22 15:40:58 +01:00
Brendan Abolivier
dbd46decad Revert "Do not send consent notices if "no-consent-required" is set"
This reverts commit 27a686e53b.
2019-08-22 14:47:43 +01:00
Brendan Abolivier
1c5b8c6222 Revert "Add "require_consent" parameter for registration"
This reverts commit 3320aaab3a.
2019-08-22 14:47:34 +01:00
Half-Shot
27a686e53b Do not send consent notices if "no-consent-required" is set 2019-08-22 14:22:04 +01:00
Half-Shot
3320aaab3a Add "require_consent" parameter for registration 2019-08-22 14:21:54 +01:00
Erik Johnston
1b09cf8658 Merge pull request #5850 from matrix-org/erikj/retry_well_known_on_fail
Retry well known on fail
2019-08-22 13:17:05 +01:00
Jorik Schellekens
9a6f2be572 Opentrace e2e keys (#5855)
Add opentracing tags and logs for e2e keys
2019-08-22 11:28:12 +01:00
Richard van der Hoff
c9f11d09fc Add missing index on users_in_public_rooms. (#5894) 2019-08-22 10:43:13 +01:00
Richard van der Hoff
119aa31b10 Servlet to purge old rooms (#5845) 2019-08-22 10:42:59 +01:00
Richard van der Hoff
ef1c524bb3 Improve error msg when key-fetch fails (#5896)
There's no point doing a raise_from here, because the exception is always
logged at warn with no stacktrace in the caller. Instead, let's try to give
better messages to reduce confusion.

In particular, this means that we won't log 'Failed to connect to remote
server' when we don't even attempt to connect to the remote server due to
blacklisting.
2019-08-22 10:42:06 +01:00
Richard van der Hoff
4dab867288 Drop some unused tables. (#5893)
These tables are never used, so we may as well drop them.
2019-08-21 13:16:28 +01:00
Erik Johnston
62fb643cdc Newsfile 2019-08-21 11:21:58 +01:00
Erik Johnston
97cbc96093 Only sign when we respond to remote key requests 2019-08-21 11:21:58 +01:00
Erik Johnston
5906be8589 Add config option for keys to use to sign keys
This allows servers to separate keys that are used to sign remote keys
when acting as a notary server.
2019-08-21 10:44:58 +01:00
Richard van der Hoff
72bc285669 Refactor the Appservice scheduler code (#5886)
Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
2019-08-20 17:42:45 +01:00
Richard van der Hoff
baa3f4a80d Avoid deep recursion in appservice recovery (#5885)
Hopefully, this will fix a stack overflow when recovering an appservice.

The recursion here leads to a huge chain of deferred callbacks, which then
overflows the stack when the chain completes. `inlineCallbacks` makes a better
job of this if we use iteration instead.

Clean up the code a bit too, while we're there.
2019-08-20 17:39:38 +01:00
Olivier Wilkinson (reivilibre)
18a4c03c50 Remove needless defaults.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:04:04 +01:00
Olivier Wilkinson (reivilibre)
eafa8d3c54 Unify name of 'stats regenerator' in schema comments.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 16:03:37 +01:00
Olivier Wilkinson (reivilibre)
977310ee27 Clarify _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:49:00 +01:00
Olivier Wilkinson (reivilibre)
981c6cf544 Sanitise accepted fields in _update_stats_delta_txn
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:41:10 +01:00
Olivier Wilkinson (reivilibre)
6a19f7e101 Add room and user statistics documentation.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 15:35:20 +01:00
reivilibre
4a97eef0dc Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:12:21 +01:00
reivilibre
b5573c0ffb Update synapse/storage/stats.py
Co-Authored-By: Erik Johnston <erik@matrix.org>
2019-08-20 15:02:49 +01:00
Olivier Wilkinson (reivilibre)
1819563640 Ack, isort!
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:27:47 +01:00
Olivier Wilkinson (reivilibre)
80a1c6e9e5 Add storage function for storing stats deltas
Old collection is not included in this commit

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 14:08:02 +01:00
Jorik Schellekens
c886f976e0 Opentracing doc update (#5776)
Update opentracing docs to use the unified 'trace' method
2019-08-20 13:56:03 +01:00
Olivier Wilkinson (reivilibre)
d7675e79e1 Add schema for Separated Statistics
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:45:05 +01:00
reivilibre
8de9ebe35d Tear out current room & user statistics (#5880)
* Tear out current room & user statistics.

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>

* Black is back with more linting complaints

Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-20 13:22:07 +01:00
Richard van der Hoff
5019945828 Refactor the Appservice scheduler code
Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code
(it seemed to be previously inexplicably split between
`ApplicationServiceScheduler.start` and `_Recoverer.start`).

Add some docstrings too.
2019-08-20 11:50:23 +01:00
Erik Johnston
1dec31560e Change jitter to be a factor rather than absolute value 2019-08-20 11:46:00 +01:00
Olivier Wilkinson (reivilibre)
8374bcb0a8 Newsfile
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
2019-08-19 15:25:53 +01:00
Erik Johnston
861d663c15 Fixup changelog and remove debug logging 2019-08-16 13:15:26 +01:00
Erik Johnston
1771f0045d Newsfile 2019-08-15 09:28:58 +01:00
Erik Johnston
e6e136decc Retry well known on fail.
If we have recently seen a valid well-known for a domain we want to
retry on (non-final) errors a few times, to handle temporary blips in
networking/etc.
2019-08-15 09:28:58 +01:00
62 changed files with 1640 additions and 1471 deletions

1
changelog.d/5771.feature Normal file
View File

@@ -0,0 +1 @@
Make Opentracing work in worker mode.

1
changelog.d/5776.misc Normal file
View File

@@ -0,0 +1 @@
Update opentracing docs to use the unified `trace` method.

1
changelog.d/5845.feature Normal file
View File

@@ -0,0 +1 @@
Add an admin API to purge old rooms from the database.

1
changelog.d/5850.feature Normal file
View File

@@ -0,0 +1 @@
Add retry to well-known lookups if we have recently seen a valid well-known record for the server.

1
changelog.d/5852.feature Normal file
View File

@@ -0,0 +1 @@
Pass opentracing contexts between servers when transmitting EDUs.

1
changelog.d/5855.misc Normal file
View File

@@ -0,0 +1 @@
Opentracing for room and e2e keys.

1
changelog.d/5877.removal Normal file
View File

@@ -0,0 +1 @@
Remove shared secret registration from client/r0/register endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.

1
changelog.d/5879.misc Normal file
View File

@@ -0,0 +1 @@
Rework room and user statistics to separate current & historical rows, as well as track stats correctly.

1
changelog.d/5885.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix stack overflow when recovering an appservice which had an outage.

1
changelog.d/5886.misc Normal file
View File

@@ -0,0 +1 @@
Refactor the Appservice scheduler code.

1
changelog.d/5893.misc Normal file
View File

@@ -0,0 +1 @@
Drop some unused tables.

1
changelog.d/5894.misc Normal file
View File

@@ -0,0 +1 @@
Add missing index on users_in_public_rooms to improve the performance of directory queries.

1
changelog.d/5895.feature Normal file
View File

@@ -0,0 +1 @@
Add config option to sign remote key query responses with a separate key.

1
changelog.d/5896.misc Normal file
View File

@@ -0,0 +1 @@
Improve the logging when we have an error when fetching signing keys.

1
changelog.d/5906.feature Normal file
View File

@@ -0,0 +1 @@
Increase max display name size to 256.

1
changelog.d/5909.misc Normal file
View File

@@ -0,0 +1 @@
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!

1
changelog.d/5911.misc Normal file
View File

@@ -0,0 +1 @@
Add support for database engine-specific schema deltas, based on file extension.

View File

@@ -0,0 +1,18 @@
Purge room API
==============
This API will remove all trace of a room from your database.
All local users must have left the room before it can be removed.
The API is:
```
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
```
You must authenticate using the access token of an admin user.

View File

@@ -32,7 +32,7 @@ It is up to the remote server to decide what it does with the spans
it creates. This is called the sampling policy and it can be configured
through Jaeger's settings.
For OpenTracing concepts see
For OpenTracing concepts see
https://opentracing.io/docs/overview/what-is-tracing/.
For more information about Jaeger's implementation see
@@ -79,7 +79,7 @@ Homeserver whitelisting
The homeserver whitelist is configured using regular expressions. A list of regular
expressions can be given and their union will be compared when propagating any
spans contexts to another homeserver.
spans contexts to another homeserver.
Though it's mostly safe to send and receive span contexts to and from
untrusted users since span contexts are usually opaque ids it can lead to
@@ -92,6 +92,29 @@ two problems, namely:
but that doesn't prevent another server sending you baggage which will be logged
to OpenTracing's logs.
==========
EDU FORMAT
==========
EDUs can contain tracing data in their content. This is not specced but
it could be of interest for other homeservers.
EDU format (if you're using jaeger):
.. code-block:: json
{
"edu_type": "type",
"content": {
"org.matrix.opentracing_context": {
"uber-trace-id": "fe57cf3e65083289"
}
}
}
Though you don't have to use jaeger you must inject the span context into
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
==================
Configuring Jaeger
==================

View File

@@ -1027,6 +1027,14 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
# Enable SAML2 for registration and login. Uses pysaml2.

View File

@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()
def create_recoverer(service, callback):
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")
# check for any DOWN ASes and start recoverers for them.
recoverers = yield _Recoverer.start(
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
services = yield self.store.get_appservices_by_state(
ApplicationServiceState.DOWN
)
self.txn_ctrl.add_recoverers(recoverers)
for service in services:
self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
class _ServiceQueuer(object):
"""Queues events for the same application service together, sending
transactions as soon as possible. Once a transaction is sent successfully,
this schedules any other events in the queue to run.
"""Queue of events waiting to be sent to appservices.
Groups events into transactions per-appservice, and sends them on to the
TransactionController. Makes sure that we only have one transaction in flight per
appservice at a given time.
"""
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
class _TransactionController(object):
def __init__(self, clock, store, as_api, recoverer_fn):
"""Transaction manager.
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
if a transaction fails.
(Note we have only have one of these in the homeserver.)
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
"""
def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []
# map from service id to recoverer instance
self.recoverers = {}
# for UTs
self.RECOVERER_CLASS = _Recoverer
@defer.inlineCallbacks
def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
run_in_background(self._on_txn_fail, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
def add_recoverers(self, recoverers):
for r in recoverers:
self.recoverers.append(r)
if len(recoverers) > 0:
logger.info("New active recoverers: %s", len(self.recoverers))
@defer.inlineCallbacks
def _start_recoverer(self, service):
def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id,
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
def start_recoverer(self, service):
"""Start a Recoverer for the given service
Args:
service (synapse.appservice.ApplicationService):
"""
logger.info("Starting recoverer for AS ID %s", service.id)
assert service.id not in self.recoverers
recoverer = self.RECOVERER_CLASS(
self.clock, self.store, self.as_api, service, self.on_recovered
)
self.recoverers[service.id] = recoverer
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
class _Recoverer(object):
@staticmethod
@defer.inlineCallbacks
def start(clock, store, as_api, callback):
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
for r in recoverers:
logger.info(
"Starting recoverer for AS ID %s which was marked as " "DOWN",
r.service.id,
)
r.recover()
return recoverers
"""Manages retries and backoff for a DOWN appservice.
We have one of these for each appservice which is currently considered DOWN.
Args:
clock (synapse.util.Clock):
store (synapse.storage.DataStore):
as_api (synapse.appservice.api.ApplicationServiceApi):
service (synapse.appservice.ApplicationService): the service we are managing
callback (callable[_Recoverer]): called once the service recovers.
"""
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
"as-recoverer-%s" % (self.service.id,), self.retry
)
self.clock.call_later((2 ** self.backoff_counter), _retry)
delay = 2 ** self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
self.clock.call_later(delay, _retry)
def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
@defer.inlineCallbacks
def retry(self):
logger.info("Starting retries on %s", self.service.id)
try:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if txn:
while True:
txn = yield self.store.get_oldest_unsent_txn(self.service)
if not txn:
# nothing left: we're done!
self.callback(self)
return
logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
if sent:
yield txn.complete(self.store)
# reset the backoff counter and retry immediately
self.backoff_counter = 1
yield self.retry()
else:
self._backoff()
else:
self._set_service_recovered()
except Exception as e:
logger.exception(e)
self._backoff()
if not sent:
break
def _set_service_recovered(self):
self.callback(self)
yield txn.complete(self.store)
# reset the backoff counter and then process the next transaction
self.backoff_counter = 1
except Exception:
logger.exception("Unexpected error running retries")
# we didn't manage to send all of the transactions before we got an error of
# some flavour: reschedule the next retry.
self._backoff()

View File

@@ -115,7 +115,7 @@ class EmailConfig(Config):
missing.append("email." + k)
if config.get("public_baseurl") is None:
missing.append("public_base_url")
missing.append("public_baseurl")
if len(missing) > 0:
raise RuntimeError(

View File

@@ -76,7 +76,7 @@ class KeyConfig(Config):
config_dir_path, config["server_name"] + ".signing.key"
)
self.signing_key = self.read_signing_key(signing_key_path)
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
@@ -85,6 +85,14 @@ class KeyConfig(Config):
config.get("key_refresh_interval", "1d")
)
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
if key_server_signing_keys_path:
self.key_server_signing_keys = self.read_signing_keys(
key_server_signing_keys_path, "key_server_signing_keys_path"
)
else:
self.key_server_signing_keys = list(self.signing_key)
# if neither trusted_key_servers nor perspectives are given, use the default.
if "perspectives" not in config and "trusted_key_servers" not in config:
key_servers = [{"server_name": "matrix.org"}]
@@ -210,16 +218,34 @@ class KeyConfig(Config):
#
#trusted_key_servers:
# - server_name: "matrix.org"
#
# The signing keys to use when acting as a trusted key server. If not specified
# defaults to the server signing key.
#
# Can contain multiple keys, one per line.
#
#key_server_signing_keys_path: "key_server_signing_keys.key"
"""
% locals()
)
def read_signing_key(self, signing_key_path):
signing_keys = self.read_file(signing_key_path, "signing_key")
def read_signing_keys(self, signing_key_path, name):
"""Read the signing keys in the given path.
Args:
signing_key_path (str)
name (str): Associated config key name
Returns:
list[SigningKey]
"""
signing_keys = self.read_file(signing_key_path, name)
try:
return read_signing_keys(signing_keys.splitlines(True))
except Exception as e:
raise ConfigError("Error reading signing_key: %s" % (str(e)))
raise ConfigError("Error reading %s: %s" % (name, str(e)))
def read_old_signing_keys(self, old_signing_keys):
keys = {}

View File

@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
self.stats_bucket_size = 86400
self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
self.stats_bucket_size = (
self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
self.stats_bucket_size = self.parse_duration(
stats_config.get("bucket_size", "1d")
)
self.stats_retention = (
self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
/ 1000
self.stats_retention = self.parse_duration(
stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):

View File

@@ -18,7 +18,6 @@ import logging
from collections import defaultdict
import six
from six import raise_from
from six.moves import urllib
import attr
@@ -30,7 +29,6 @@ from signedjson.key import (
from signedjson.sign import (
SignatureVerifyException,
encode_canonical_json,
sign_json,
signature_ids,
verify_signed_json,
)
@@ -540,13 +538,7 @@ class BaseV2KeyFetcher(object):
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
)
# re-sign the json with our own key, so that it is ready if we are asked to
# give it out as a notary server
signed_key_json = sign_json(
response_json, self.config.server_name, self.config.signing_key[0]
)
signed_key_json_bytes = encode_canonical_json(signed_key_json)
key_json_bytes = encode_canonical_json(response_json)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -558,7 +550,7 @@ class BaseV2KeyFetcher(object):
from_server=from_server,
ts_now_ms=time_added_ms,
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
key_json_bytes=key_json_bytes,
)
for key_id in verify_keys
],
@@ -657,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
},
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
# these both have str() representations which we can't really improve upon
raise KeyLookupError(str(e))
except HttpResponseException as e:
raise_from(KeyLookupError("Remote server returned an error"), e)
raise KeyLookupError("Remote server returned an error: %s" % (e,))
keys = {}
added_keys = []
@@ -821,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
# these both have str() representations which we can't really improve
# upon
raise KeyLookupError(str(e))
except HttpResponseException as e:
raise_from(KeyLookupError("Remote server returned an error"), e)
raise KeyLookupError("Remote server returned an error: %s" % (e,))
if response["server_name"] != server_name:
raise KeyLookupError(

View File

@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)
@trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = yield self.store.claim_e2e_one_time_keys(query)
json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)

View File

@@ -14,11 +14,19 @@
# limitations under the License.
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
)
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
# no active span here, so if the edus were not received by the remote the
# span would have no causality and it would be forgotten.
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
span_contexts = (
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
with start_active_span_follows_from("send_transaction", span_contexts):
self._next_txn_id += 1
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
success = True
# Actually send the transaction
logger.debug("TX [%s] _attempt_new_transaction", destination)
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
txn_id = str(self._next_txn_id)
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise e
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
self._next_txn_id += 1
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
e_id,
r,
p.event_id,
)
else:
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
p.event_id,
)
success = False
success = False
return success
set_tag(tags.ERROR, not success)
return success

View File

@@ -38,7 +38,12 @@ from synapse.http.servlet import (
parse_string_from_args,
)
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import start_active_span_from_context, tags
from synapse.logging.opentracing import (
start_active_span,
start_active_span_from_request,
tags,
whitelisted_homeserver,
)
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e)
raise
# Start an opentracing span
with start_active_span_from_context(
request.requestHeaders,
"incoming-federation-request",
tags={
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
},
):
request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
}
# Only accept the span context if the origin is authenticated
# and whitelisted
if origin and whitelisted_homeserver(origin):
scope = start_active_span_from_request(
request, "incoming-federation-request", tags=request_tags
)
else:
scope = start_active_span(
"incoming-federation-request", tags=request_tags
)
with scope:
if origin:
with ratelimiter.ratelimit(origin) as d:
await d

View File

@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
internal_keys = ["origin", "destination"]
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home

View File

@@ -15,9 +15,17 @@
import logging
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
message_id = random_string(16)
context = get_active_span_text_map()
remote_edu_contents = {}
for destination, messages in remote_messages.items():
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
}
with start_active_span("to_device_for_user"):
set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
}
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents

View File

@@ -24,6 +24,7 @@ from twisted.internet import defer
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
from synapse.types import UserID, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
@trace
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)
# First get local devices.
failures = {}
results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
@trace
@defer.inlineCallbacks
def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures}
@trace
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
log_kv(
{
"message": "Requested a local key for a user which"
" was not local to the homeserver",
"user_id": user_id,
}
)
set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
log_kv(results)
return result_dict
@defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
@trace
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
@trace
@defer.inlineCallbacks
def claim_client_keys(destination):
set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
set_tag("error", True)
set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
),
)
log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
@defer.inlineCallbacks
@tag_args
def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
user_id,
time_now,
)
log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
else:
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
if one_time_keys:
log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
@defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)

View File

@@ -26,6 +26,7 @@ from synapse.api.errors import (
StoreError,
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
log_kv(results)
return results
@trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
log_kv(
{
"message": "Trying to upload room key",
"room_id": room_id,
"session_id": session_id,
"user_id": user_id,
}
)
# get the room_key for this particular row
current_room_key = None
try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
)
except StoreError as e:
if e.code == 404:
pass
log_kv(
{
"message": "Room key not found.",
"room_id": room_id,
"user_id": user_id,
}
)
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
log_kv({"message": "Replacing room key."})
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
else:
log_kv({"message": "Not replacing room_key."})
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
return False
return True
@trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
raise
return res
@trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
else:
raise
@trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup

View File

@@ -70,6 +70,7 @@ class PaginationHandler(object):
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self._server_name = hs.hostname
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
"""
return self._purges_by_id.get(purge_id)
async def purge_room(self, room_id):
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
# check we know about the room
await self.store.get_room_version(room_id)
# first check that we have no users in this room
joined = await defer.maybeDeferred(
self.store.is_host_joined, room_id, self._server_name
)
if joined:
raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id)
@defer.inlineCallbacks
def get_messages(
self,

View File

@@ -34,7 +34,7 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
MAX_DISPLAYNAME_LEN = 100
MAX_DISPLAYNAME_LEN = 256
MAX_AVATAR_URL_LEN = 1000

View File

@@ -15,14 +15,7 @@
import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -49,9 +42,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -62,272 +52,4 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
if not self.hs.config.stats_enabled:
return
if self._is_processing:
return
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
return None
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
event_processing_positions.labels("stats").set(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""
Called with the state deltas to process
"""
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
if token is not None and token >= stream_id:
logger.debug(
"Ignoring: %s as earlier than this room's initial ingestion event",
event_id,
)
continue
if event_id is None and prev_event_id is None:
# Errr...
continue
event_content = {}
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
yield self.store.update_room_state(
room_id,
{
"join_rules": None,
"history_visibility": None,
"encryption": None,
"name": None,
"topic": None,
"avatar": None,
"canonical_alias": None,
},
)
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
room_id, {"encryption": event_content.get("algorithm")}
)
elif typ == EventTypes.Name:
yield self.store.update_room_state(
room_id, {"name": event_content.get("name")}
)
elif typ == EventTypes.Topic:
yield self.store.update_room_state(
room_id, {"topic": event_content.get("topic")}
)
elif typ == EventTypes.RoomAvatar:
yield self.store.update_room_state(
room_id, {"avatar": event_content.get("url")}
)
elif typ == EventTypes.CanonicalAlias:
yield self.store.update_room_state(
room_id, {"canonical_alias": event_content.get("alias")}
)
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
Increment/decrement a user's number of public rooms when a room they are
in changes to/from public visibility.
Args:
ts (int): Timestamp in seconds
room_id (str)
is_public (bool)
"""
# For now, blindly iterate over all local users in the room so that
# we can handle the whole problem of copying buckets over as needed
user_ids = yield self.store.get_users_in_room(room_id)
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
)
@defer.inlineCallbacks
def _is_public_room(self, room_id):
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
history_visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility
)
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
(
history_visibility
and history_visibility.content.get("history_visibility")
== "world_readable"
)
):
return True
else:
return False
pass

View File

@@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
_well_known_cache (TTLCache|None):
TTLCache impl for storing cached well-known lookups. None to use a default
implementation.
_well_known_resolver (WellKnownResolver|None):
WellKnownResolver to use to perform well-known lookups. None to use a
default implementation.
"""
def __init__(
@@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
_well_known_cache=None,
_well_known_resolver=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
@@ -76,15 +76,17 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
self._well_known_resolver = WellKnownResolver(
self._reactor,
agent=Agent(
if _well_known_resolver is None:
_well_known_resolver = WellKnownResolver(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
),
well_known_cache=_well_known_cache,
)
agent=Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
),
)
self._well_known_resolver = _well_known_resolver
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):

View File

@@ -32,12 +32,19 @@ from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
# jitter to add to the .well-known default cache ttl
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# jitter factor to add to the .well-known default cache ttls
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# period to cache failure to fetch .well-known if there has recently been a
# valid well-known for that domain.
WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
# period to remember there was a valid well-known after valid record expires
WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
@@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
# we'll start trying to refetch 1 minute before it expires.
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
# Number of times we retry fetching a well-known for a domain we know recently
# had a valid entry.
WELL_KNOWN_RETRY_ATTEMPTS = 3
logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known")
_had_valid_well_known_cache = TTLCache("had-valid-well-known")
@attr.s(slots=True, frozen=True)
@@ -65,14 +77,20 @@ class WellKnownResolver(object):
"""Handles well-known lookups for matrix servers.
"""
def __init__(self, reactor, agent, well_known_cache=None):
def __init__(
self, reactor, agent, well_known_cache=None, had_well_known_cache=None
):
self._reactor = reactor
self._clock = Clock(reactor)
if well_known_cache is None:
well_known_cache = _well_known_cache
if had_well_known_cache is None:
had_well_known_cache = _had_valid_well_known_cache
self._well_known_cache = well_known_cache
self._had_valid_well_known_cache = had_well_known_cache
self._well_known_agent = RedirectAgent(agent)
@defer.inlineCallbacks
@@ -100,7 +118,7 @@ class WellKnownResolver(object):
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name)
result, cache_period = yield self._fetch_well_known(server_name)
except _FetchWellKnownFailure as e:
if prev_result and e.temporary:
@@ -111,10 +129,18 @@ class WellKnownResolver(object):
result = None
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
if self._had_valid_well_known_cache.get(server_name, False):
# We have recently seen a valid well-known record for this
# server, so we cache the lack of well-known for a shorter time.
cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
else:
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd
cache_period *= random.uniform(
1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
)
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
@@ -122,7 +148,7 @@ class WellKnownResolver(object):
return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks
def _do_get_well_known(self, server_name):
def _fetch_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
@@ -134,24 +160,15 @@ class WellKnownResolver(object):
Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
# We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response).
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=True)
response, body = yield self._make_well_known_request(
server_name, retry=had_valid_well_known
)
try:
if response.code != 200:
@@ -161,8 +178,11 @@ class WellKnownResolver(object):
logger.info("Response from .well-known: %s", parsed_body)
result = parsed_body["m.server"].encode("ascii")
except defer.CancelledError:
# Bail if we've been cancelled
raise
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
logger.info("Error parsing well-known for %s: %s", server_name, e)
raise _FetchWellKnownFailure(temporary=False)
cache_period = _cache_period_from_headers(
@@ -172,13 +192,69 @@ class WellKnownResolver(object):
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
cache_period *= random.uniform(
1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
)
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
# We got a success, mark as such in the cache
self._had_valid_well_known_cache.set(
server_name,
bool(result),
cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
)
return (result, cache_period)
@defer.inlineCallbacks
def _make_well_known_request(self, server_name, retry):
"""Make the well known request.
This will retry the request if requested and it fails (with unable
to connect or receives a 5xx error).
Args:
server_name (bytes)
retry (bool): Whether to retry the request if it fails.
Returns:
Deferred[tuple[IResponse, bytes]] Returns the response object and
body. Response may be a non-200 response.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
i = 0
while True:
i += 1
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
return response, body
except defer.CancelledError:
# Bail if we've been cancelled
raise
except Exception as e:
if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=True)
logger.info("Error fetching %s: %s. Retrying", uri_str, e)
# Sleep briefly in the hopes that they come back up
yield self._clock.sleep(0.5)
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)

View File

@@ -300,7 +300,7 @@ class RestServlet(object):
http_server.register_paths(
method,
patterns,
trace_servlet(servlet_classname, method_handler),
trace_servlet(servlet_classname)(method_handler),
servlet_classname,
)

View File

@@ -43,6 +43,9 @@ OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as
an optional dependency. This does however limit the number of modifiable spans
at any point in the code to one. From here out references to `opentracing`
in the code snippets refer to the Synapses module.
Most methods provided in the module have a direct correlation to those provided
by opentracing. Refer to docs there for a more in-depth documentation on some of
the args and methods.
Tracing
-------
@@ -68,52 +71,62 @@ set a tag on the current active span.
Tracing functions
-----------------
Functions can be easily traced using decorators. There is a decorator for
'normal' function and for functions which are actually deferreds. The name of
Functions can be easily traced using decorators. The name of
the function becomes the operation name for the span.
.. code-block:: python
from synapse.logging.opentracing import trace, trace_deferred
from synapse.logging.opentracing import trace
# Start a span using 'normal_function' as the operation name
# Start a span using 'interesting_function' as the operation name
@trace
def normal_function(*args, **kwargs):
def interesting_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
# Start a span using 'deferred_function' as the operation name
@trace_deferred
@defer.inlineCallbacks
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
return something_usual_and_useful
Operation names can be explicitly set for functions by using
``trace_using_operation_name`` and
``trace_deferred_using_operation_name``
``trace_using_operation_name``
.. code-block:: python
from synapse.logging.opentracing import (
trace_using_operation_name,
trace_deferred_using_operation_name
)
from synapse.logging.opentracing import trace_using_operation_name
@trace_using_operation_name("A *much* better operation name")
def normal_function(*args, **kwargs):
def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
@trace_deferred_using_operation_name("Another exciting operation name!")
@defer.inlineCallbacks
def deferred_function(*args, **kwargs):
# We start
yield we_wait
# we finish
return something_usual_and_useful
Setting Tags
------------
To set a tag on the active span do
.. code-block:: python
from synapse.logging.opentracing import set_tag
set_tag(tag_name, tag_value)
There's a convenient decorator to tag all the args of the method. It uses
inspection in order to use the formal parameter names prefixed with 'ARG_' as
tag names. It uses kwarg names as tag names without the prefix.
.. code-block:: python
from synapse.logging.opentracing import tag_args
@tag_args
def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
pass
set_fates("the story", "the end", "the act")
# This will have the following tags
# - ARG_clotho: "the story"
# - ARG_lachesis: "the end"
# - ARG_atropos: "the act"
# - father: "Zues"
# - mother: "Themis"
Contexts and carriers
---------------------
@@ -136,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
in a destination and compares it to the whitelist.
Most injection methods take a 'destination' arg. The context will only be injected
if the destination matches the whitelist or the destination is None.
=======
Gotchas
=======
@@ -161,10 +177,48 @@ from twisted.internet import defer
from synapse.config import ConfigError
# Helper class
class _DummyTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
try:
import opentracing
tags = opentracing.tags
except ImportError:
opentracing = None
tags = _DummyTagNames
try:
from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager
@@ -239,10 +293,6 @@ def init_tracer(config):
scope_manager=LogContextScopeManager(config),
).initialize_tracer()
# Set up tags to be opentracing's tags
global tags
tags = opentracing.tags
# Whitelisting
@@ -321,8 +371,8 @@ def start_active_span_follows_from(operation_name, contexts):
return scope
def start_active_span_from_context(
headers,
def start_active_span_from_request(
request,
operation_name,
references=None,
tags=None,
@@ -331,9 +381,9 @@ def start_active_span_from_context(
finish_on_close=True,
):
"""
Extracts a span context from Twisted Headers.
Extracts a span context from a Twisted Request.
args:
headers (twisted.web.http_headers.Headers)
headers (twisted.web.http.Request)
For the other args see opentracing.tracer
@@ -347,7 +397,9 @@ def start_active_span_from_context(
if opentracing is None:
return _noop_context_manager()
header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
return opentracing.tracer.start_active_span(
@@ -435,7 +487,7 @@ def set_operation_name(operation_name):
@only_if_tracing
def inject_active_span_twisted_headers(headers, destination):
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
"""
Injects a span context into twisted headers in-place
@@ -454,7 +506,7 @@ def inject_active_span_twisted_headers(headers, destination):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
if not whitelisted_homeserver(destination):
if check_destination and not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
@@ -466,7 +518,7 @@ def inject_active_span_twisted_headers(headers, destination):
@only_if_tracing
def inject_active_span_byte_dict(headers, destination):
def inject_active_span_byte_dict(headers, destination, check_destination=True):
"""
Injects a span context into a dict where the headers are encoded as byte
strings
@@ -498,7 +550,7 @@ def inject_active_span_byte_dict(headers, destination):
@only_if_tracing
def inject_active_span_text_map(carrier, destination=None):
def inject_active_span_text_map(carrier, destination, check_destination=True):
"""
Injects a span context into a dict
@@ -519,7 +571,7 @@ def inject_active_span_text_map(carrier, destination=None):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
if destination and not whitelisted_homeserver(destination):
if check_destination and not whitelisted_homeserver(destination):
return
opentracing.tracer.inject(
@@ -527,6 +579,29 @@ def inject_active_span_text_map(carrier, destination=None):
)
def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of manually
injecting a span into an empty carrier.
Args:
destination (str): the name of the remote server.
Returns:
dict: the active span's context if opentracing is enabled, otherwise empty.
"""
if not opentracing or (destination and not whitelisted_homeserver(destination)):
return {}
carrier = {}
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
return carrier
def active_span_context_as_string():
"""
Returns:
@@ -676,65 +751,43 @@ def tag_args(func):
return _tag_args_inner
def trace_servlet(servlet_name, func):
def trace_servlet(servlet_name, extract_context=False):
"""Decorator which traces a serlet. It starts a span with some servlet specific
tags such as the servlet_name and request information"""
if not opentracing:
return func
tags such as the servlet_name and request information
@wraps(func)
@defer.inlineCallbacks
def _trace_servlet_inner(request, *args, **kwargs):
with start_active_span(
"incoming-client-request",
tags={
Args:
servlet_name (str): The name to be used for the span's operation_name
extract_context (bool): Whether to attempt to extract the opentracing
context from the request the servlet is handling.
"""
def _trace_servlet_inner_1(func):
if not opentracing:
return func
@wraps(func)
@defer.inlineCallbacks
def _trace_servlet_inner(request, *args, **kwargs):
request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"servlet_name": servlet_name,
},
):
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
return result
}
return _trace_servlet_inner
if extract_context:
scope = start_active_span_from_request(
request, servlet_name, tags=request_tags
)
else:
scope = start_active_span(servlet_name, tags=request_tags)
with scope:
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
return result
# Helper class
return _trace_servlet_inner
class _DummyTagNames(object):
"""wrapper of opentracings tags. We need to have them if we
want to reference them without opentracing around. Clearly they
should never actually show up in a trace. `set_tags` overwrites
these with the correct ones."""
INVALID_TAG = "invalid-tag"
COMPONENT = INVALID_TAG
DATABASE_INSTANCE = INVALID_TAG
DATABASE_STATEMENT = INVALID_TAG
DATABASE_TYPE = INVALID_TAG
DATABASE_USER = INVALID_TAG
ERROR = INVALID_TAG
HTTP_METHOD = INVALID_TAG
HTTP_STATUS_CODE = INVALID_TAG
HTTP_URL = INVALID_TAG
MESSAGE_BUS_DESTINATION = INVALID_TAG
PEER_ADDRESS = INVALID_TAG
PEER_HOSTNAME = INVALID_TAG
PEER_HOST_IPV4 = INVALID_TAG
PEER_HOST_IPV6 = INVALID_TAG
PEER_PORT = INVALID_TAG
PEER_SERVICE = INVALID_TAG
SAMPLING_PRIORITY = INVALID_TAG
SERVICE = INVALID_TAG
SPAN_KIND = INVALID_TAG
SPAN_KIND_CONSUMER = INVALID_TAG
SPAN_KIND_PRODUCER = INVALID_TAG
SPAN_KIND_RPC_CLIENT = INVALID_TAG
SPAN_KIND_RPC_SERVER = INVALID_TAG
tags = _DummyTagNames
return _trace_servlet_inner_1

View File

@@ -22,6 +22,7 @@ from six.moves import urllib
from twisted.internet import defer
import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
@@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers = {}
opentracing.inject_active_span_byte_dict(
headers, None, check_destination=False
)
try:
result = yield request_func(uri, data)
result = yield request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
http_server.register_paths(
method,
[pattern],
opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
handler
),
self.__class__.__name__,
)
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks

View File

@@ -42,6 +42,7 @@ from synapse.rest.admin._base import (
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.users import UserAdminServlet
from synapse.types import UserID, create_requester
@@ -739,6 +740,7 @@ def register_servlets(hs, http_server):
Register all the admin servlets.
"""
register_servlets_for_client_rest_resource(hs, http_server)
PurgeRoomServlet(hs).register(http_server)
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
UserAdminServlet(hs).register(http_server)

View File

@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from synapse.http.servlet import (
RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
)
from synapse.rest.admin import assert_requester_is_admin
class PurgeRoomServlet(RestServlet):
"""Servlet which will remove all trace of a room from the database
POST /_synapse/admin/v1/purge_room
{
"room_id": "!room:id"
}
returns:
{}
"""
PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
self.hs = hs
self.auth = hs.get_auth()
self.pagination_handler = hs.get_pagination_handler()
async def on_POST(self, request):
await assert_requester_is_admin(self.auth, request)
body = parse_json_object_from_request(request)
assert_params_in_dict(body, ("room_id",))
await self.pagination_handler.purge_room(body["room_id"])
return (200, {})

View File

@@ -24,6 +24,7 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.types import StreamToken
from ._base import client_patterns
@@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
@trace_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
set_tag("error", True)
log_kv(
{
"message": "Client uploading keys for a different device",
"logged_in_id": requester.device_id,
"key_being_uploaded": device_id,
}
)
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
@@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
from_token_string = parse_string(request, "from")
set_tag("from", from_token_string)
# We want to enforce they do pass us one, but we ignore it and return
# changes after the "to" as well as before.
parse_string(request, "to")
set_tag("to", parse_string(request, "to"))
from_token = StreamToken.from_string(from_token_string)

View File

@@ -16,7 +16,6 @@
import hmac
import logging
from hashlib import sha1
from six import string_types
@@ -239,14 +238,12 @@ class RegisterRestServlet(RestServlet):
# we do basic sanity checks here because the auth layer will store these
# in sessions. Pull out the username/password provided to us.
desired_password = None
if "password" in body:
if (
not isinstance(body["password"], string_types)
or len(body["password"]) > 512
):
raise SynapseError(400, "Invalid password")
desired_password = body["password"]
desired_username = None
if "username" in body:
@@ -261,8 +258,8 @@ class RegisterRestServlet(RestServlet):
if self.auth.has_access_token(request):
appservice = yield self.auth.get_appservice_by_req(request)
# fork off as soon as possible for ASes and shared secret auth which
# have completely different registration flows to normal users
# fork off as soon as possible for ASes which have completely
# different registration flows to normal users
# == Application Service Registration ==
if appservice:
@@ -285,8 +282,8 @@ class RegisterRestServlet(RestServlet):
return (200, result) # we throw for non 200 responses
return
# for either shared secret or regular registration, downcase the
# provided username before attempting to register it. This should mean
# for regular registration, downcase the provided username before
# attempting to register it. This should mean
# that people who try to register with upper-case in their usernames
# don't get a nasty surprise. (Note that we treat username
# case-insenstively in login, so they are free to carry on imagining
@@ -294,16 +291,6 @@ class RegisterRestServlet(RestServlet):
if desired_username is not None:
desired_username = desired_username.lower()
# == Shared Secret Registration == (e.g. create new user scripts)
if "mac" in body:
# FIXME: Should we really be determining if this is shared secret
# auth based purely on the 'mac' key?
result = yield self._do_shared_secret_registration(
desired_username, desired_password, body
)
return (200, result) # we throw for non 200 responses
return
# == Normal User Registration == (everyone else)
if not self.hs.config.enable_registration:
raise SynapseError(403, "Registration has been disabled")
@@ -512,42 +499,6 @@ class RegisterRestServlet(RestServlet):
)
return (yield self._create_registration_details(user_id, body))
@defer.inlineCallbacks
def _do_shared_secret_registration(self, username, password, body):
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
if not username:
raise SynapseError(
400, "username must be specified", errcode=Codes.BAD_JSON
)
# use the username from the original request rather than the
# downcased one in `username` for the mac calculation
user = body["username"].encode("utf-8")
# str() because otherwise hmac complains that 'unicode' does not
# have the buffer interface
got_mac = str(body["mac"])
# FIXME this is different to the /v1/register endpoint, which
# includes the password and admin flag in the hashed text. Why are
# these different?
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
msg=user,
digestmod=sha1,
).hexdigest()
if not compare_digest(want_mac, got_mac):
raise SynapseError(403, "HMAC incorrect")
user_id = yield self.registration_handler.register_user(
localpart=username, password=password
)
result = yield self._create_registration_details(user_id, body)
return result
@defer.inlineCallbacks
def _create_registration_details(self, user_id, params):
"""Complete registration of newly-registered user

View File

@@ -13,7 +13,9 @@
# limitations under the License.
import logging
from io import BytesIO
from canonicaljson import encode_canonical_json, json
from signedjson.sign import sign_json
from twisted.internet import defer
@@ -95,6 +97,7 @@ class RemoteKey(DirectServeResource):
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
self.config = hs.config
@wrap_json_request_handler
async def _async_render_GET(self, request):
@@ -214,15 +217,14 @@ class RemoteKey(DirectServeResource):
yield self.fetcher.get_keys(cache_misses)
yield self.query_keys(request, query, query_remote_on_cache_miss=False)
else:
result_io = BytesIO()
result_io.write(b'{"server_keys":')
sep = b"["
for json_bytes in json_results:
result_io.write(sep)
result_io.write(json_bytes)
sep = b","
if sep == b"[":
result_io.write(sep)
result_io.write(b"]}")
signed_keys = []
for key_json in json_results:
key_json = json.loads(key_json)
for signing_key in self.config.key_server_signing_keys:
key_json = sign_json(key_json, self.config.server_name, signing_key)
respond_with_json_bytes(request, 200, result_io.getvalue())
signed_keys.append(key_json)
results = {"server_keys": signed_keys}
respond_with_json_bytes(request, 200, encode_canonical_json(results))

View File

@@ -34,7 +34,7 @@ class WellKnownBuilder(object):
self._config = hs.config
def get_well_known(self):
# if we don't have a public_base_url, we can't help much here.
# if we don't have a public_baseurl, we can't help much here.
if self._config.public_baseurl is None:
return None

View File

@@ -21,6 +21,11 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.logging.opentracing import (
get_active_span_text_map,
trace,
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices}
@trace
@defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers
@@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries
#
# maps (user_id, device_id) -> stream_id
# maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row
#
# opentracing_context contains the opentracing metadata for the request
# that created the poke
#
# The most recent request's opentracing_context is used as the
# context which created the Edu.
query_map = {}
for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
break
key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2])
update_context = update[3]
update_stream_id = update[2]
previous_update_stream_id, _ = query_map.get(key, (0, None))
if update_stream_id > previous_update_stream_id:
query_map[key] = (update_stream_id, update_context)
# If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same
@@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates
"""
sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id
LIMIT ?
@@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
Args:
destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
query_map (Dict[(str, str): int]): Dictionary mapping
user_id/device_id to update stream_id
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
user_id/device_id to update stream_id and the relevent json-encoded
opentracing context
Returns:
List[Dict]: List of objects representing an device update EDU
@@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id
)
for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
stream_id, opentracing_context = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
"device_id": device_id,
"prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id,
"org.matrix.opentracing_context": opentracing_context,
}
prev_id = stream_id
@@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
],
)
context = get_active_span_text_map()
self._simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
@@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id,
"sent": False,
"ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
}
for destination in hosts
for device_id in device_ids

View File

@@ -18,6 +18,7 @@ import json
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.logging.opentracing import log_kv, trace
from ._base import SQLBaseStore
@@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
},
lock=False,
)
log_kv(
{
"message": "Set room key",
"room_id": room_id,
"session_id": session_id,
"room_key": room_key,
}
)
@trace
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
@trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
@trace
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
@trace
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.

View File

@@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
@trace
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
set_tag("query_list", query_list)
if not query_list:
return {}
@@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
@trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)
query_clauses = []
query_params = []
@@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
log_kv(result)
return result
@defer.inlineCallbacks
@@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
return result
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
@trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
log_kv(
{
"message": "Deleting keys for device",
"device_id": device_id,
"user_id": user_id,
}
)
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",

View File

@@ -1302,15 +1302,11 @@ class EventsStore(
"event_reference_hashes",
"event_search",
"event_to_state_groups",
"guest_access",
"history_visibility",
"local_invites",
"room_names",
"state_events",
"rejections",
"redactions",
"room_memberships",
"topics",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@@ -1454,10 +1450,10 @@ class EventsStore(
for event, _ in events_and_contexts:
if event.type == EventTypes.Name:
# Insert into the room_names and event_search tables.
# Insert into the event_search table.
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
# Insert into the topics table and event_search table.
# Insert into the event_search table.
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Message:
# Insert into the event_search table.
@@ -1465,12 +1461,6 @@ class EventsStore(
elif event.type == EventTypes.Redaction:
# Insert into the redactions table.
self._store_redaction(txn, event)
elif event.type == EventTypes.RoomHistoryVisibility:
# Insert into the event_search table.
self._store_history_visibility_txn(txn, event)
elif event.type == EventTypes.GuestAccess:
# Insert into the event_search table.
self._store_guest_access_txn(txn, event)
self._handle_event_relations(txn, event)
@@ -2191,6 +2181,144 @@ class EventsStore(
return to_delete, to_dedelta
def purge_room(self, room_id):
"""Deletes all record of a room
Args:
room_id (str):
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
"event_push_actions_staging",
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"redactions",
"rejections",
"state_events",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute(
"""
DELETE FROM %s WHERE event_id IN (
SELECT event_id FROM events WHERE room_id=?
)
"""
% (table,),
(room_id,),
)
# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
"event_push_actions",
"event_search",
"events",
"group_rooms",
"public_room_list_stream",
"receipts_graph",
"receipts_linearized",
"room_aliases",
"room_depth",
"room_memberships",
"room_stats_state",
"room_stats_current",
"room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
"topics",
"users_in_public_rooms",
"users_who_share_private_rooms",
# no useful index, but let's clear them anyway
"appservice_room_list",
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"local_invites",
"room_account_data",
"room_tags",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
# Other tables we do NOT need to clear out:
#
# - blocked_rooms
# This is important, to make sure that we don't accidentally rejoin a blocked
# room after it was purged
#
# - user_directory
# This has a room_id column, but it is unused
#
# Other tables that we might want to consider clearing out include:
#
# - event_reports
# Given that these are intended for abuse management my initial
# inclination is to leave them in place.
#
# - current_state_delta_stream
# - ex_outlier_stream
# - room_tags_revisions
# The problem with these is that they are largeish and there is no room_id
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)
# TODO: we could probably usefully do a bunch of cache invalidation here
logger.info("[purge] done")
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream

View File

@@ -238,6 +238,13 @@ def _upgrade_existing_database(
logger.debug("applied_delta_files: %s", applied_delta_files)
if isinstance(database_engine, PostgresEngine):
specific_engine_extension = ".postgres"
else:
specific_engine_extension = ".sqlite"
specific_engine_extensions = (".sqlite", ".postgres")
for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v)
@@ -274,15 +281,22 @@ def _upgrade_existing_database(
# Sometimes .pyc files turn up anyway even though we've
# disabled their generation; e.g. from distribution package
# installers. Silently skip it
pass
continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
# A .sql file for a different engine; skip it.
continue
else:
# Not a valid delta file.
logger.warn(
"Found directory entry that did not end in .py or" " .sql: %s",
logger.warning(
"Found directory entry that did not end in .py or .sql: %s",
relative_path,
)
continue
@@ -290,7 +304,7 @@ def _upgrade_existing_database(
# Mark as done.
cur.execute(
database_engine.convert_param_style(
"INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)"
"INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
),
(v, relative_path),
)
@@ -298,7 +312,7 @@ def _upgrade_existing_database(
cur.execute("DELETE FROM schema_version")
cur.execute(
database_engine.convert_param_style(
"INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
),
(v, True),
)

View File

@@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
def _store_room_topic_txn(self, txn, event):
if hasattr(event, "content") and "topic" in event.content:
self._simple_insert_txn(
txn,
"topics",
{
"event_id": event.event_id,
"room_id": event.room_id,
"topic": event.content["topic"],
},
)
self.store_event_search_txn(
txn, event, "content.topic", event.content["topic"]
)
def _store_room_name_txn(self, txn, event):
if hasattr(event, "content") and "name" in event.content:
self._simple_insert_txn(
txn,
"room_names",
{
"event_id": event.event_id,
"room_id": event.room_id,
"name": event.content["name"],
},
)
self.store_event_search_txn(
txn, event, "content.name", event.content["name"]
)
@@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
txn, event, "content.body", event.content["body"]
)
def _store_history_visibility_txn(self, txn, event):
self._store_content_index_txn(txn, event, "history_visibility")
def _store_guest_access_txn(self, txn, event):
self._store_content_index_txn(txn, event, "guest_access")
def _store_content_index_txn(self, txn, event, key):
if hasattr(event, "content") and key in event.content:
sql = (
"INSERT INTO %(key)s"
" (event_id, room_id, %(key)s)"
" VALUES (?, ?, ?)" % {"key": key}
)
txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
def add_event_report(
self, room_id, event_id, user_id, reason, content, received_ts
):

View File

@@ -0,0 +1,20 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Opentracing context data for inclusion in the device_list_update EDUs, as a
* json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
*/
ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;

View File

@@ -0,0 +1,20 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- these tables are never used.
DROP TABLE IF EXISTS room_names;
DROP TABLE IF EXISTS topics;
DROP TABLE IF EXISTS history_visibility;
DROP TABLE IF EXISTS guest_access;

View File

@@ -0,0 +1,142 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_cleanup'
);
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
CREATE TABLE IF NOT EXISTS stats_incremental_position (
-- the stream_id of the last-processed state delta
state_delta_stream_id BIGINT,
-- the stream_ordering of the last-processed backfilled event
-- (this is negative)
total_events_min_stream_ordering BIGINT,
-- the stream_ordering of the last-processed normally-created event
-- (this is positive)
total_events_max_stream_ordering BIGINT,
-- If true, this represents the contract agreed upon by the stats
-- regenerator.
-- If false, this is suitable for use by the delta/incremental processor.
is_background_contract BOOLEAN NOT NULL PRIMARY KEY
);
-- insert a null row and make sure it is the only one.
DELETE FROM stats_incremental_position;
INSERT INTO stats_incremental_position (
state_delta_stream_id,
total_events_min_stream_ordering,
total_events_max_stream_ordering,
is_background_contract
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL room statistics for a room
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular room.)
-- represents PRESENT statistics for a user
-- only holds absolute fields
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
-- If initial stats regen is still to be performed: NULL
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT
);
-- represents HISTORICAL statistics for a user
CREATE TABLE IF NOT EXISTS user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)
-- Also rename room_state to room_stats_state to make its ownership clear.
ALTER TABLE room_state RENAME TO room_stats_state;

View File

@@ -0,0 +1,24 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- These partial indices helps us with finding incomplete stats row
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (room_id)
WHERE completed_delta_stream_id IS NULL;
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (user_id)
WHERE completed_delta_stream_id IS NULL;

View File

@@ -0,0 +1,27 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- even though SQLite >= 3.8 can support partial indices, we won't enable
-- them, in case the SQLite database may be later used on another system.
-- It's also the case that SQLite is only likely to be used in small
-- deployments or testing, where the optimisations gained by use of a
-- partial index are not a big concern.
CREATE INDEX IF NOT EXISTS room_stats_not_complete
ON room_stats_current (completed_delta_stream_id, room_id);
CREATE INDEX IF NOT EXISTS user_stats_not_complete
ON user_stats_current (completed_delta_stream_id, user_id);

View File

@@ -0,0 +1,17 @@
/* Copyright 2019 Matrix.org Foundation CIC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- this was apparently forgotten when the table was created back in delta 53.
CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,17 +15,16 @@
# limitations under the License.
import logging
from itertools import chain
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
# You can think of these as Prometheus Gauges.
# You can draw these stats on a line graph.
# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -32,14 +32,17 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
"state_events",
"total_events",
),
"user": ("public_rooms", "private_rooms"),
}
TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
# these fields are per-timeslice and so should be reset to 0 upon a new slice
# You can draw these stats on a histogram.
# Example: number of events sent locally during a time slice
PER_SLICE_FIELDS = {"room": (), "user": ()}
TEMP_TABLE = "_temp_populate_stats"
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class StatsStore(StateDeltasStore):
@@ -51,292 +54,25 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
self.register_background_update_handler(
"populate_stats_createtables", self._populate_stats_createtables
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
"populate_stats_cleanup", self._populate_stats_cleanup
)
self.register_noop_background_update("populate_stats_createtables")
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
@defer.inlineCallbacks
def _populate_stats_createtables(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_createtables")
return 1
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
stmts = get_statements(
"""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);
CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(
temp=TEMP_TABLE
).splitlines()
)
for statement in stmts:
txn.execute(statement)
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_position(position TEXT NOT NULL)"
)
txn.execute(sql)
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (
TEMP_TABLE,
)
txn.execute(sql)
new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
self.get_earliest_token_for_room_stats.invalidate_all()
yield self._end_background_update("populate_stats_createtables")
return 1
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
def quantise_stats_time(self, ts):
"""
Update the user directory stream position, then clean up the old tables.
Quantises a timestamp to be a multiple of the bucket size.
Args:
ts (int): the timestamp to quantise, in milliseconds since the Unix
Epoch
Returns:
int: a timestamp which
- is divisible by the bucket size;
- is no later than `ts`; and
- is the largest such timestamp.
"""
if not self.stats_enabled:
yield self._end_background_update("populate_stats_cleanup")
return 1
position = yield self._simple_select_one_onecol(
TEMP_TABLE + "_position", None, "position"
)
yield self.update_stats_stream_pos(position)
def _delete_staging_area(txn):
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
yield self._end_background_update("populate_stats_cleanup")
return 1
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
# If we don't have progress filed, delete everything.
if not progress:
yield self.delete_all_stats()
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
SELECT room_id, events FROM %s_rooms
ORDER BY events DESC
LIMIT 250
""" % (
TEMP_TABLE,
)
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
if not rooms_to_work_on:
return None
# Get how many are left to process, so we can give status on how
# far we are in processing
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
"populate_stats_temp_read", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
yield self._end_background_update("populate_stats_process_rooms")
return 1
logger.info(
"Processing the next %d rooms of %d remaining",
len(rooms_to_work_on),
progress["remaining"],
)
# Number of state events we've processed by going through each room
processed_event_count = 0
for room_id, event_count in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
history_visibility_id = current_state_ids.get(
(EventTypes.RoomHistoryVisibility, "")
)
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
name_id = current_state_ids.get((EventTypes.Name, ""))
topic_id = current_state_ids.get((EventTypes.Topic, ""))
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
event_ids = [
join_rules_id,
history_visibility_id,
encryption_id,
name_id,
topic_id,
avatar_id,
canonical_alias_id,
]
state_events = yield self.get_events(
[ev for ev in event_ids if ev is not None]
)
def _get_or_none(event_id, arg):
event = state_events.get(event_id)
if event:
return event.content.get(arg)
return None
yield self.update_room_state(
room_id,
{
"join_rules": _get_or_none(join_rules_id, "join_rule"),
"history_visibility": _get_or_none(
history_visibility_id, "history_visibility"
),
"encryption": _get_or_none(encryption_id, "algorithm"),
"name": _get_or_none(name_id, "name"),
"topic": _get_or_none(topic_id, "topic"),
"avatar": _get_or_none(avatar_id, "url"),
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)
now = self.hs.get_reactor().seconds()
# quantise time to the nearest bucket
now = (now // self.stats_bucket_size) * self.stats_bucket_size
def _fetch_data(txn):
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
current_state_events = len(current_state_ids)
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
self._update_stats_txn(
txn,
"room",
room_id,
now,
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
self._simple_insert_txn(
txn,
"room_stats_earliest_token",
{"room_id": room_id, "token": current_token},
)
# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
)
yield self.runInteraction("update_room_stats", _fetch_data)
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
"populate_stats",
self._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
processed_event_count += event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
return processed_event_count
def delete_all_stats(self):
"""
Delete all statistics records.
"""
def _delete_all_stats_txn(txn):
txn.execute("DELETE FROM room_state")
txn.execute("DELETE FROM room_stats")
txn.execute("DELETE FROM room_stats_earliest_token")
txn.execute("DELETE FROM user_stats")
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_stream_pos(self):
return self._simple_select_one_onecol(
table="stats_stream_pos",
keyvalues={},
retcol="stream_id",
desc="stats_stream_pos",
)
def update_stats_stream_pos(self, stream_id):
return self._simple_update_one(
table="stats_stream_pos",
keyvalues={},
updatevalues={"stream_id": stream_id},
desc="update_stats_stream_pos",
)
return (ts // self.stats_bucket_size) * self.stats_bucket_size
def update_room_state(self, room_id, fields):
"""
@@ -361,124 +97,271 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
table="room_state",
table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
def get_deltas_for_room(self, room_id, start, size=100):
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
Get statistics deltas for a given room.
Updates the statistics for a subject, with a delta (difference/relative
change).
Args:
room_id (str)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS["room"] and "ts".
ts (int): timestamp of the change
stats_type (str): "room" or "user" the kind of subject
stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
If supplied, converts an incomplete row into a complete row,
with the supplied stream_id marked as the stream_id where the
row was completed.
"""
return self._simple_select_list_paginate(
"room_stats",
{"room_id": room_id},
"ts",
start,
size,
retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
order_direction="DESC",
return self.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=complete_with_stream_id,
)
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
)
@cached()
def get_earliest_token_for_room_stats(self, room_id):
def _update_stats_delta_txn(
self,
txn,
ts,
stats_type,
stats_id,
fields,
complete_with_stream_id=None,
absolute_field_overrides=None,
):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
start of the background task and any particular room's stats
being calculated.
Returns:
Deferred[int]
See L{update_stats_delta}
Additional Args:
absolute_field_overrides (dict[str, int]): Current stats values
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""
return self._simple_select_one_onecol(
"room_stats_earliest_token",
{"room_id": room_id},
retcol="token",
allow_none=True,
)
table, id_col = TYPE_TO_TABLE[stats_type]
def update_stats(self, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert(
table=table,
keyvalues={id_col: stats_id, "ts": ts},
values=fields,
desc="update_stats",
)
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
table, id_col = TYPE_TO_ROOM[stats_type]
return self._simple_upsert_txn(
txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
)
def update_stats_delta(self, ts, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table, id_col = TYPE_TO_ROOM[stats_type]
sql = (
"SELECT * FROM %s"
" WHERE %s=? and ts=("
" SELECT MAX(ts) FROM %s"
" WHERE %s=?"
")"
) % (table, id_col, table, id_col)
txn.execute(sql, (stats_id, stats_id))
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
# this tries to minimise any race between the initial sync and
# subsequent deltas arriving.
return
current_ts = ts
latest_ts = rows[0]["ts"]
if current_ts < latest_ts:
# This one is in the past, but we're just encountering it now.
# Mark it as part of the current bucket.
current_ts = latest_ts
elif ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
values = {
key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
}
values[id_col] = stats_id
values["ts"] = ts
values["bucket_size"] = self.stats_bucket_size
self._simple_insert_txn(txn, table=table, values=values)
# actually update the new value
if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
self._simple_update_txn(
txn,
table=table,
keyvalues={id_col: stats_id, "ts": current_ts},
updatevalues={field: value},
abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_field_overrides.keys()):
if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
# only absolute stats fields are tracked in the `_current` stats tables,
# so those are the only ones that we process deltas for when
# we upsert against the `_current` table.
# This calculates the deltas (`field = field + ?` values)
# for absolute fields,
# * defaulting to 0 if not specified
# (required for the INSERT part of upserting to work)
# * omitting overrides specified in `absolute_field_overrides`
deltas_of_absolute_fields = {
key: fields.get(key, 0)
for key in abs_field_names
if key not in absolute_field_overrides
}
if absolute_field_overrides is None:
absolute_field_overrides = {}
if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
"completed_delta_stream_id"
] = complete_with_stream_id
# first upsert the `_current` table
self._upsert_with_additive_relatives_txn(
txn=txn,
table=table + "_current",
keyvalues={id_col: stats_id},
absolutes=absolute_field_overrides,
additive_relatives=deltas_of_absolute_fields,
)
if self.has_completed_background_updates():
# TODO want to check specifically for stats regenerator, not all
# background updates…
# then upsert the `_historical` table.
# we don't support absolute_fields for per-slice fields as it makes
# no sense.
per_slice_additive_relatives = {
key: fields.get(key, 0) for key in slice_field_names
}
self._upsert_copy_from_table_with_additive_relatives_txn(
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
extra_dst_keyvalues={
"end_ts": end_ts,
"bucket_size": self.stats_bucket_size,
},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
additional_where=" AND completed_delta_stream_id IS NOT NULL",
)
def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives
):
"""Used to update values in the stats tables.
Args:
txn: Transaction
table (str): Table name
keyvalues (dict[str, any]): Row-identifying key values
absolutes (dict[str, any]): Absolute (set) fields
additive_relatives (dict[str, int]): Fields that will be added onto
if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
insert_cols = []
qargs = [table]
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = chain(absolutes.keys(), additive_relatives.keys())
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self._simple_insert_txn(txn, table, merged_dict)
else:
sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
table,
field,
field,
id_col,
)
txn.execute(sql, (value, stats_id, current_ts))
for (key, val) in additive_relatives.items():
current_row[key] += val
current_row.update(absolutes)
self._simple_update_one_txn(txn, table, keyvalues, current_row)
return self.runInteraction("update_stats_delta", _update_stats_delta)
def _upsert_copy_from_table_with_additive_relatives_txn(
self,
txn,
into_table,
keyvalues,
extra_dst_keyvalues,
additive_relatives,
src_table,
copy_columns,
additional_where="",
):
"""
Args:
txn: Transaction
into_table (str): The destination table to UPSERT the row into
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
copy_columns (iterable[str]): The list of columns to copy
additional_where (str): Additional SQL for where (prefix with AND
if using).
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
)
sel_exprs = chain(
keyvalues,
copy_columns,
("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
"%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
)
sql = """
INSERT INTO %(into_table)s (%(ins_columns)s)
SELECT %(sel_exprs)s
FROM %(src_table)s
WHERE %(keyvalues_where)s %(additional_where)s
ON CONFLICT (%(keyvalues)s)
DO UPDATE SET %(sets)s
""" % {
"into_table": into_table,
"ins_columns": ", ".join(ins_columns),
"sel_exprs": ", ".join(sel_exprs),
"keyvalues_where": " AND ".join(keyvalues_where),
"src_table": src_table,
"keyvalues": ", ".join(
chain(keyvalues.keys(), extra_dst_keyvalues.keys())
),
"sets": ", ".join(chain(sets_cc, sets_ar)),
"additional_where": additional_where,
}
qargs = chain(additive_relatives.values(), keyvalues.values())
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
src_row = self._simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
dest_current_row = self._simple_select_one_txn(
txn,
into_table,
keyvalues,
chain(additive_relatives.keys(), copy_columns),
allow_none=True,
)
if dest_current_row is None:
merged_dict = {**keyvalues, **src_row, **additive_relatives}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)

View File

@@ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController(
clock=self.clock,
store=self.store,
as_api=self.as_api,
recoverer_fn=self.recoverer_fn,
clock=self.clock, store=self.store, as_api=self.as_api
)
self.txnctrl.RECOVERER_CLASS = self.recoverer_fn
def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent.

View File

@@ -1,304 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
class StatsRoomTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
def _add_background_updates(self):
"""
Add the background updates we need to run.
"""
# Ugh, have to reset this flag
self.store._all_done = False
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
"depends_on": "populate_stats_createtables",
},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
def test_initial_room(self):
"""
The background updates will build the table from scratch.
"""
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Disable stats
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Stats disabled, shouldn't have done anything
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 0)
# Enable stats
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
r = self.get_success(self.store.get_all_room_state())
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["topic"], "foo")
def test_initial_earliest_token(self):
"""
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
self.handler.stats_enabled = False
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
u2 = self.register_user("u2", "pass")
u2_token = self.login("u2", "pass")
u3 = self.register_user("u3", "pass")
u3_token = self.login("u3", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
)
# Begin the ingestion by creating the temp tables. This will also store
# the position that the deltas should begin at, once they take over.
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
self.get_success(self.store.update_stats_stream_pos(None))
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_createtables", "progress_json": "{}"},
)
)
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
# Now, before the table is actually ingested, add some more events.
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
"background_updates",
{"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
)
)
self.get_success(
self.store._simple_insert(
"background_updates",
{
"update_name": "populate_stats_cleanup",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
self.store._all_done = False
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
self.reactor.advance(86401)
# Now add some more events, triggering ingestion. Because of the stream
# position being set to before the events sent in the middle, a simpler
# implementation would reprocess those events, and say there were four
# users, not three.
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
# Get the deltas! There should be two -- day 1, and day 2.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
# The newest has 3
self.assertEqual(r[0]["joined_members"], 3)
def test_incorrect_state_transition(self):
"""
If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
(JOIN, INVITE, LEAVE, BAN), an error is raised.
"""
events = {
"a1": {"membership": Membership.LEAVE},
"a2": {"membership": "not a real thing"},
}
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": 60,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid prev_membership"
)
# And the other way...
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user",
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)
def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
# Do the initial population of the user directory via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)
events = {"a1": None, "a2": {"membership": Membership.JOIN}}
def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d
def get_received_ts(event_id):
return defer.succeed(1)
self.store.get_received_ts = get_received_ts
self.store.get_event = get_event
deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]
# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))
# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)

View File

@@ -73,8 +73,6 @@ class MatrixFederationAgentTests(TestCase):
self.mock_resolver = Mock()
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
config_dict = default_config("test", parse=False)
config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()]
@@ -82,11 +80,21 @@ class MatrixFederationAgentTests(TestCase):
config.parse_config_dict(config_dict, "", "")
self.tls_factory = ClientTLSOptionsFactory(config)
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
self.had_well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
self.well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
)
self.agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=self.tls_factory,
_srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache,
_well_known_resolver=self.well_known_resolver,
)
def _make_connection(self, client_factory, expected_sni):
@@ -543,7 +551,7 @@ class MatrixFederationAgentTests(TestCase):
self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
# check the cache expires
self.reactor.pump((25 * 3600,))
self.reactor.pump((48 * 3600,))
self.well_known_cache.expire()
self.assertNotIn(b"testserv", self.well_known_cache)
@@ -631,7 +639,7 @@ class MatrixFederationAgentTests(TestCase):
self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
# check the cache expires
self.reactor.pump((25 * 3600,))
self.reactor.pump((48 * 3600,))
self.well_known_cache.expire()
self.assertNotIn(b"testserv", self.well_known_cache)
@@ -701,11 +709,18 @@ class MatrixFederationAgentTests(TestCase):
config = default_config("test", parse=True)
# Build a new agent and WellKnownResolver with a different tls factory
tls_factory = ClientTLSOptionsFactory(config)
agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=ClientTLSOptionsFactory(config),
tls_client_options_factory=tls_factory,
_srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache,
_well_known_resolver=WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=tls_factory),
well_known_cache=self.well_known_cache,
had_well_known_cache=self.had_well_known_cache,
),
)
test_d = agent.request(b"GET", b"matrix://testserv/foo/bar")
@@ -932,15 +947,9 @@ class MatrixFederationAgentTests(TestCase):
self.successResultOf(test_d)
def test_well_known_cache(self):
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
@@ -963,7 +972,7 @@ class MatrixFederationAgentTests(TestCase):
well_known_server.loseConnection()
# repeat the request: it should hit the cache
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
@@ -971,7 +980,7 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((1000.0,))
# now it should connect again
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@@ -992,15 +1001,9 @@ class MatrixFederationAgentTests(TestCase):
it ignores transient errors.
"""
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
@@ -1026,27 +1029,37 @@ class MatrixFederationAgentTests(TestCase):
# another lookup.
self.reactor.pump((900.0,))
fetch_d = well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
# fonx the connection attempt, this will be treated as a temporary
# failure.
client_factory.clientConnectionFailed(None, Exception("nope"))
# The resolver may retry a few times, so fonx all requests that come along
attempts = 0
while self.reactor.tcpClients:
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
# attemptdelay on the hostnameendpoint is 0.3, so takes that long before the
# .well-known request fails.
self.reactor.pump((0.4,))
attempts += 1
# fonx the connection attempt, this will be treated as a temporary
# failure.
client_factory.clientConnectionFailed(None, Exception("nope"))
# There's a few sleeps involved, so we have to pump the reactor a
# bit.
self.reactor.pump((1.0, 1.0))
# We expect to see more than one attempt as there was previously a valid
# well known.
self.assertGreater(attempts, 1)
# Resolver should return cached value, despite the lookup failing.
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
# Expire the cache and repeat the request
self.reactor.pump((100.0,))
# Expire both caches and repeat the request
self.reactor.pump((10000.0,))
# Repated the request, this time it should fail if the lookup fails.
fetch_d = well_known_resolver.get_well_known(b"testserv")
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)