1
0

Merge commit '88e1d0c52' into anoa/dinsic_release_1_23_1

This commit is contained in:
Andrew Morgan
2020-12-31 13:40:45 +00:00
22 changed files with 588 additions and 41 deletions

View File

@@ -57,7 +57,7 @@ light workloads.
System requirements:
- POSIX-compliant system (tested on Linux & OS X)
- Python 3.5.2 or later, up to Python 3.8.
- Python 3.5.2 or later, up to Python 3.9.
- At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
Synapse is written in Python but some of the libraries it uses are written in

1
changelog.d/8647.feature Normal file
View File

@@ -0,0 +1 @@
Add an admin API `GET /_synapse/admin/v1/users/<user_id>/media` to get information about uploaded media. Contributed by @dklimpel.

1
changelog.d/8665.doc Normal file
View File

@@ -0,0 +1 @@
Note support for Python 3.9.

1
changelog.d/8667.doc Normal file
View File

@@ -0,0 +1 @@
Interlink prometheus/grafana documentation.

1
changelog.d/8668.misc Normal file
View File

@@ -0,0 +1 @@
Reduce number of OpenTracing spans started.

1
changelog.d/8670.misc Normal file
View File

@@ -0,0 +1 @@
Reduce number of OpenTracing spans started.

1
changelog.d/8671.misc Normal file
View File

@@ -0,0 +1 @@
Abstract some invite-related code in preparation for landing knocking.

View File

@@ -3,4 +3,4 @@
0. Set up Prometheus and Grafana. Out of scope for this readme. Useful documentation about using Grafana with Prometheus: http://docs.grafana.org/features/datasources/prometheus/
1. Have your Prometheus scrape your Synapse. https://github.com/matrix-org/synapse/blob/master/docs/metrics-howto.md
2. Import dashboard into Grafana. Download `synapse.json`. Import it to Grafana and select the correct Prometheus datasource. http://docs.grafana.org/reference/export_import/
3. Set up additional recording rules
3. Set up required recording rules. https://github.com/matrix-org/synapse/tree/master/contrib/prometheus

View File

@@ -341,6 +341,89 @@ The following fields are returned in the JSON response body:
- ``total`` - Number of rooms.
List media of an user
================================
Gets a list of all local media that a specific ``user_id`` has created.
The response is ordered by creation date descending and media ID descending.
The newest media is on top.
The API is::
GET /_synapse/admin/v1/users/<user_id>/media
To use it, you will need to authenticate by providing an ``access_token`` for a
server admin: see `README.rst <README.rst>`_.
A response body like the following is returned:
.. code:: json
{
"media": [
{
"created_ts": 100400,
"last_access_ts": null,
"media_id": "qXhyRzulkwLsNHTbpHreuEgo",
"media_length": 67,
"media_type": "image/png",
"quarantined_by": null,
"safe_from_quarantine": false,
"upload_name": "test1.png"
},
{
"created_ts": 200400,
"last_access_ts": null,
"media_id": "FHfiSnzoINDatrXHQIXBtahw",
"media_length": 67,
"media_type": "image/png",
"quarantined_by": null,
"safe_from_quarantine": false,
"upload_name": "test2.png"
}
],
"next_token": 3,
"total": 2
}
To paginate, check for ``next_token`` and if present, call the endpoint again
with ``from`` set to the value of ``next_token``. This will return a new page.
If the endpoint does not return a ``next_token`` then there are no more
reports to paginate through.
**Parameters**
The following parameters should be set in the URL:
- ``user_id`` - string - fully qualified: for example, ``@user:server.com``.
- ``limit``: string representing a positive integer - Is optional but is used for pagination,
denoting the maximum number of items to return in this call. Defaults to ``100``.
- ``from``: string representing a positive integer - Is optional but used for pagination,
denoting the offset in the returned results. This should be treated as an opaque value and
not explicitly set to anything other than the return value of ``next_token`` from a previous call.
Defaults to ``0``.
**Response**
The following fields are returned in the JSON response body:
- ``media`` - An array of objects, each containing information about a media.
Media objects contain the following fields:
- ``created_ts`` - integer - Timestamp when the content was uploaded in ms.
- ``last_access_ts`` - integer - Timestamp when the content was last accessed in ms.
- ``media_id`` - string - The id used to refer to the media.
- ``media_length`` - integer - Length of the media in bytes.
- ``media_type`` - string - The MIME-type of the media.
- ``quarantined_by`` - string - The user ID that initiated the quarantine request
for this media.
- ``safe_from_quarantine`` - bool - Status if this media is safe from quarantining.
- ``upload_name`` - string - The name the media was uploaded with.
- ``next_token``: integer - Indication for pagination. See above.
- ``total`` - integer - Total number of media.
User devices
============

View File

@@ -60,6 +60,8 @@
1. Restart Prometheus.
1. Consider using the [grafana dashboard](https://github.com/matrix-org/synapse/tree/master/contrib/grafana/) and required [recording rules](https://github.com/matrix-org/synapse/tree/master/contrib/prometheus/)
## Monitoring workers
To monitor a Synapse installation using

View File

@@ -131,6 +131,7 @@ setup(
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
],
scripts=["synctl"] + glob.glob("scripts/*"),
cmdclass={"test": TestCommand},

View File

@@ -1100,34 +1100,13 @@ class EventCreationHandler:
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
def is_inviter_member_event(e):
return e.type == EventTypes.Member and e.sender == event.sender
current_state_ids = await context.get_current_state_ids()
# We know this event is not an outlier, so this must be
# non-None.
assert current_state_ids is not None
state_to_include_ids = [
e_id
for k, e_id in current_state_ids.items()
if k[0] in self.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
state_to_include = await self.store.get_events(state_to_include_ids)
event.unsigned["invite_room_state"] = [
{
"type": e.type,
"state_key": e.state_key,
"content": e.content,
"sender": e.sender,
}
for e in state_to_include.values()
]
event.unsigned[
"invite_room_state"
] = await self.store.get_stripped_room_state_from_event_context(
context,
self.room_invite_state_types,
membership_user_id=event.sender,
)
invitee = UserID.from_string(event.state_key)
if not self.hs.is_mine(invitee):

View File

@@ -167,20 +167,25 @@ class FollowerTypingHandler:
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = row.user_ids
run_as_background_process(
"_handle_change_in_typing",
self._handle_change_in_typing,
row.room_id,
prev_typing,
now_typing,
)
if self.federation:
run_as_background_process(
"_send_changes_in_typing_to_remotes",
self._send_changes_in_typing_to_remotes,
row.room_id,
prev_typing,
now_typing,
)
async def _handle_change_in_typing(
async def _send_changes_in_typing_to_remotes(
self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
):
"""Process a change in typing of a room from replication, sending EDUs
for any local users.
"""
if not self.federation:
return
for user_id in now_typing - prev_typing:
if self.is_mine_id(user_id):
await self._push_remote(RoomMember(room_id, user_id), True)

View File

@@ -117,6 +117,16 @@ class ReplicationStreamer:
stream.discard_updates_and_advance()
return
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
if all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
return
# If there are updates then we need to set this even if we're already
# looping, as the loop needs to know that he might need to loop again.
self.pending_updates = True
if self.is_looping:

View File

@@ -53,6 +53,7 @@ from synapse.rest.admin.users import (
ResetPasswordRestServlet,
SearchUsersRestServlet,
UserAdminServlet,
UserMediaRestServlet,
UserMembershipRestServlet,
UserRegisterServlet,
UserRestServletV2,
@@ -218,6 +219,7 @@ def register_servlets(hs, http_server):
SendServerNoticeServlet(hs).register(http_server)
VersionServlet(hs).register(http_server)
UserAdminServlet(hs).register(http_server)
UserMediaRestServlet(hs).register(http_server)
UserMembershipRestServlet(hs).register(http_server)
UserRestServletV2(hs).register(http_server)
UsersRestServletV2(hs).register(http_server)

View File

@@ -16,6 +16,7 @@ import hashlib
import hmac
import logging
from http import HTTPStatus
from typing import Tuple
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -27,13 +28,14 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import (
admin_patterns,
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.types import UserID
from synapse.types import JsonDict, UserID
logger = logging.getLogger(__name__)
@@ -709,3 +711,66 @@ class UserMembershipRestServlet(RestServlet):
room_ids = await self.store.get_rooms_for_user(user_id)
ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
return 200, ret
class UserMediaRestServlet(RestServlet):
"""
Gets information about all uploaded local media for a specific `user_id`.
Example:
http://localhost:8008/_synapse/admin/v1/users/
@user:server/media
Args:
The parameters `from` and `limit` are required for pagination.
By default, a `limit` of 100 is used.
Returns:
A list of media and an integer representing the total number of
media that exist given for this user
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]+)/media$")
def __init__(self, hs):
self.is_mine = hs.is_mine
self.auth = hs.get_auth()
self.store = hs.get_datastore()
async def on_GET(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)
if not self.is_mine(UserID.from_string(user_id)):
raise SynapseError(400, "Can only lookup local users")
user = await self.store.get_user_by_id(user_id)
if user is None:
raise NotFoundError("Unknown user")
start = parse_integer(request, "from", default=0)
limit = parse_integer(request, "limit", default=100)
if start < 0:
raise SynapseError(
400,
"Query parameter from must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
if limit < 0:
raise SynapseError(
400,
"Query parameter limit must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
media, total = await self.store.get_local_media_by_user_paginate(
start, limit, user_id
)
ret = {"media": media, "total": total}
if (start + limit) < total:
ret["next_token"] = start + len(media)
return 200, ret

View File

@@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
where_clause="NOT have_censored",
)
self.db_pool.updates.register_background_index_update(
"users_have_local_media",
index_name="users_have_local_media",
table="local_media_repository",
columns=["user_id", "created_ts"],
)
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]

View File

@@ -31,6 +31,7 @@ from synapse.api.room_versions import (
RoomVersions,
)
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.metrics.background_process_metrics import (
@@ -44,7 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
from synapse.types import Collection, JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@@ -525,6 +526,57 @@ class EventsWorkerStore(SQLBaseStore):
return event_map
async def get_stripped_room_state_from_event_context(
self,
context: EventContext,
state_types_to_include: List[EventTypes],
membership_user_id: Optional[str],
) -> List[JsonDict]:
"""
Retrieve the stripped state from a room, given an event context to retrieve state
from as well as the state types to include. Optionally, include the membership
events from a specific user.
"Stripped" state means that only the `type`, `state_key`, `content` and `sender` keys
are included from each state event.
Args:
context: The event context to retrieve state of the room from.
state_types_to_include: The type of state events to include.
membership_user_id: An optional user ID to include the stripped membership state
events of. This is useful when generating the stripped state of a room for
invites. We want to send membership events of the inviter, so that the
invitee can display the inviter's profile information if the room lacks any.
Returns:
A list of dictionaries, each representing a stripped state event from the room.
"""
current_state_ids = await context.get_current_state_ids()
# We know this event is not an outlier, so this must be
# non-None.
assert current_state_ids is not None
# The state to include
state_to_include_ids = [
e_id
for k, e_id in current_state_ids.items()
if k[0] in state_types_to_include
or (membership_user_id and k == (EventTypes.Member, membership_user_id))
]
state_to_include = await self.get_events(state_to_include_ids)
return [
{
"type": e.type,
"state_key": e.state_key,
"content": e.content,
"sender": e.sender,
}
for e in state_to_include.values()
]
def _do_fetch(self, conn):
"""Takes a database connection and waits for requests for events from
the _event_fetch_list queue.

View File

@@ -116,6 +116,57 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
desc="get_local_media",
)
async def get_local_media_by_user_paginate(
self, start: int, limit: int, user_id: str
) -> Tuple[List[Dict[str, Any]], int]:
"""Get a paginated list of metadata for a local piece of media
which an user_id has uploaded
Args:
start: offset in the list
limit: maximum amount of media_ids to retrieve
user_id: fully-qualified user id
Returns:
A paginated list of all metadata of user's media,
plus the total count of all the user's media
"""
def get_local_media_by_user_paginate_txn(txn):
args = [user_id]
sql = """
SELECT COUNT(*) as total_media
FROM local_media_repository
WHERE user_id = ?
"""
txn.execute(sql, args)
count = txn.fetchone()[0]
sql = """
SELECT
"media_id",
"media_type",
"media_length",
"upload_name",
"created_ts",
"last_access_ts",
"quarantined_by",
"safe_from_quarantine"
FROM local_media_repository
WHERE user_id = ?
ORDER BY created_ts DESC, media_id DESC
LIMIT ? OFFSET ?
"""
args += [limit, start]
txn.execute(sql, args)
media = self.db_pool.cursor_to_dict(txn)
return media, count
return await self.db_pool.runInteraction(
"get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn
)
async def get_local_media_before(
self, before_ts: int, size_gt: int, keep_profiles: bool,
) -> Optional[List[str]]:

View File

@@ -0,0 +1,2 @@
INSERT INTO background_updates (update_name, progress_json) VALUES
('users_have_local_media', '{}');

View File

@@ -17,6 +17,7 @@ import hashlib
import hmac
import json
import urllib.parse
from binascii import unhexlify
from mock import Mock
@@ -1115,3 +1116,284 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase):
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(number_rooms, channel.json_body["total"])
self.assertEqual(number_rooms, len(channel.json_body["joined_rooms"]))
class UserMediaRestTestCase(unittest.HomeserverTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
]
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()
self.media_repo = hs.get_media_repository_resource()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
self.url = "/_synapse/admin/v1/users/%s/media" % urllib.parse.quote(
self.other_user
)
def test_no_auth(self):
"""
Try to list media of an user without authentication.
"""
request, channel = self.make_request("GET", self.url, b"{}")
self.render(request)
self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
def test_requester_is_no_admin(self):
"""
If the user is not a server admin, an error is returned.
"""
other_user_token = self.login("user", "pass")
request, channel = self.make_request(
"GET", self.url, access_token=other_user_token,
)
self.render(request)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_user_does_not_exist(self):
"""
Tests that a lookup for a user that does not exist returns a 404
"""
url = "/_synapse/admin/v1/users/@unknown_person:test/media"
request, channel = self.make_request(
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(404, channel.code, msg=channel.json_body)
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_user_is_not_local(self):
"""
Tests that a lookup for a user that is not a local returns a 400
"""
url = "/_synapse/admin/v1/users/@unknown_person:unknown_domain/media"
request, channel = self.make_request(
"GET", url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(400, channel.code, msg=channel.json_body)
self.assertEqual("Can only lookup local users", channel.json_body["error"])
def test_limit(self):
"""
Testing list of media with limit
"""
number_media = 20
other_user_tok = self.login("user", "pass")
self._create_media(other_user_tok, number_media)
request, channel = self.make_request(
"GET", self.url + "?limit=5", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 5)
self.assertEqual(channel.json_body["next_token"], 5)
self._check_fields(channel.json_body["media"])
def test_from(self):
"""
Testing list of media with a defined starting point (from)
"""
number_media = 20
other_user_tok = self.login("user", "pass")
self._create_media(other_user_tok, number_media)
request, channel = self.make_request(
"GET", self.url + "?from=5", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 15)
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
def test_limit_and_from(self):
"""
Testing list of media with a defined starting point and limit
"""
number_media = 20
other_user_tok = self.login("user", "pass")
self._create_media(other_user_tok, number_media)
request, channel = self.make_request(
"GET", self.url + "?from=5&limit=10", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(channel.json_body["next_token"], 15)
self.assertEqual(len(channel.json_body["media"]), 10)
self._check_fields(channel.json_body["media"])
def test_limit_is_negative(self):
"""
Testing that a negative limit parameter returns a 400
"""
request, channel = self.make_request(
"GET", self.url + "?limit=-5", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
def test_from_is_negative(self):
"""
Testing that a negative from parameter returns a 400
"""
request, channel = self.make_request(
"GET", self.url + "?from=-5", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
def test_next_token(self):
"""
Testing that `next_token` appears at the right place
"""
number_media = 20
other_user_tok = self.login("user", "pass")
self._create_media(other_user_tok, number_media)
# `next_token` does not appear
# Number of results is the number of entries
request, channel = self.make_request(
"GET", self.url + "?limit=20", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), number_media)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does not appear
# Number of max results is larger than the number of entries
request, channel = self.make_request(
"GET", self.url + "?limit=21", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), number_media)
self.assertNotIn("next_token", channel.json_body)
# `next_token` does appear
# Number of max results is smaller than the number of entries
request, channel = self.make_request(
"GET", self.url + "?limit=19", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 19)
self.assertEqual(channel.json_body["next_token"], 19)
# Check
# Set `from` to value of `next_token` for request remaining entries
# `next_token` does not appear
request, channel = self.make_request(
"GET", self.url + "?from=19", access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(channel.json_body["total"], number_media)
self.assertEqual(len(channel.json_body["media"]), 1)
self.assertNotIn("next_token", channel.json_body)
def test_user_has_no_media(self):
"""
Tests that a normal lookup for media is successfully
if user has no media created
"""
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(0, channel.json_body["total"])
self.assertEqual(0, len(channel.json_body["media"]))
def test_get_media(self):
"""
Tests that a normal lookup for media is successfully
"""
number_media = 5
other_user_tok = self.login("user", "pass")
self._create_media(other_user_tok, number_media)
request, channel = self.make_request(
"GET", self.url, access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, channel.code, msg=channel.json_body)
self.assertEqual(number_media, channel.json_body["total"])
self.assertEqual(number_media, len(channel.json_body["media"]))
self.assertNotIn("next_token", channel.json_body)
self._check_fields(channel.json_body["media"])
def _create_media(self, user_token, number_media):
"""
Create a number of media for a specific user
"""
upload_resource = self.media_repo.children[b"upload"]
for i in range(number_media):
# file size is 67 Byte
image_data = unhexlify(
b"89504e470d0a1a0a0000000d4948445200000001000000010806"
b"0000001f15c4890000000a49444154789c63000100000500010d"
b"0a2db40000000049454e44ae426082"
)
# Upload some media into the room
self.helper.upload_media(
upload_resource, image_data, tok=user_token, expect_code=200
)
def _check_fields(self, content):
"""Checks that all attributes are present in content
"""
for m in content:
self.assertIn("media_id", m)
self.assertIn("media_type", m)
self.assertIn("media_length", m)
self.assertIn("upload_name", m)
self.assertIn("created_ts", m)
self.assertIn("last_access_ts", m)
self.assertIn("quarantined_by", m)
self.assertIn("safe_from_quarantine", m)

View File

@@ -1,5 +1,5 @@
[tox]
envlist = packaging, py35, py36, py37, py38, check_codestyle, check_isort
envlist = packaging, py35, py36, py37, py38, py39, check_codestyle, check_isort
[base]
extras = test