1
0

Merge commit 'fdf834694' into anoa/dinsic_release_1_31_0

This commit is contained in:
Andrew Morgan
2021-04-22 18:29:56 +01:00
22 changed files with 833 additions and 173 deletions

1
changelog.d/9062.feature Normal file
View File

@@ -0,0 +1 @@
Add admin API for getting and deleting forward extremities for a room.

1
changelog.d/9163.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where Synapse would return a 500 error when a thumbnail did not exist (and auto-generation of thumbnails was not enabled).

1
changelog.d/9165.bugfix Normal file
View File

@@ -0,0 +1 @@
Fix a long-standing bug where invalid data could cause errors when calculating the presentable room name for push.

1
changelog.d/9176.misc Normal file
View File

@@ -0,0 +1 @@
Speed up chain cover calculation when persisting a batch of state events at once.

1
changelog.d/9190.misc Normal file
View File

@@ -0,0 +1 @@
Improve performance of concurrent use of `StreamIDGenerators`.

1
changelog.d/9191.misc Normal file
View File

@@ -0,0 +1 @@
Add some missing source directories to the automatic linting script.

View File

@@ -9,6 +9,7 @@
* [Response](#response)
* [Undoing room shutdowns](#undoing-room-shutdowns)
- [Make Room Admin API](#make-room-admin-api)
- [Forward Extremities Admin API](#forward-extremities-admin-api)
# List Room API
@@ -511,3 +512,55 @@ optionally be specified, e.g.:
"user_id": "@foo:example.com"
}
```
# Forward Extremities Admin API
Enables querying and deleting forward extremities from rooms. When a lot of forward
extremities accumulate in a room, performance can become degraded. For details, see
[#1760](https://github.com/matrix-org/synapse/issues/1760).
## Check for forward extremities
To check the status of forward extremities for a room:
```
GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
```
A response as follows will be returned:
```json
{
"count": 1,
"results": [
{
"event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh",
"state_group": 439,
"depth": 123,
"received_ts": 1611263016761
}
]
}
```
## Deleting forward extremities
**WARNING**: Please ensure you know what you're doing and have read
the related issue [#1760](https://github.com/matrix-org/synapse/issues/1760).
Under no situations should this API be executed as an automated maintenance task!
If a room has lots of forward extremities, the extra can be
deleted as follows:
```
DELETE /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
```
A response as follows will be returned, indicating the amount of forward extremities
that were deleted.
```json
{
"deleted": 1
}
```

View File

@@ -80,7 +80,8 @@ else
# then lint everything!
if [[ -z ${files+x} ]]; then
# Lint all source code files and directories
files=("synapse" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py" "synmark")
# Note: this list aims the mirror the one in tox.ini
files=("synapse" "docker" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py" "synmark" "stubs" ".buildkite")
fi
fi

View File

@@ -17,7 +17,7 @@ import logging
import re
from typing import TYPE_CHECKING, Dict, Iterable, Optional
from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.types import StateMap
@@ -63,7 +63,7 @@ async def calculate_room_name(
m_room_name = await store.get_event(
room_state_ids[(EventTypes.Name, "")], allow_none=True
)
if m_room_name and m_room_name.content and m_room_name.content["name"]:
if m_room_name and m_room_name.content and m_room_name.content.get("name"):
return m_room_name.content["name"]
# does it have a canonical alias?
@@ -74,15 +74,11 @@ async def calculate_room_name(
if (
canon_alias
and canon_alias.content
and canon_alias.content["alias"]
and canon_alias.content.get("alias")
and _looks_like_an_alias(canon_alias.content["alias"])
):
return canon_alias.content["alias"]
# at this point we're going to need to search the state by all state keys
# for an event type, so rearrange the data structure
room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
if not fallback_to_members:
return None
@@ -94,7 +90,7 @@ async def calculate_room_name(
if (
my_member_event is not None
and my_member_event.content["membership"] == "invite"
and my_member_event.content.get("membership") == Membership.INVITE
):
if (EventTypes.Member, my_member_event.sender) in room_state_ids:
inviter_member_event = await store.get_event(
@@ -111,6 +107,10 @@ async def calculate_room_name(
else:
return "Room Invite"
# at this point we're going to need to search the state by all state keys
# for an event type, so rearrange the data structure
room_state_bytype_ids = _state_as_two_level_dict(room_state_ids)
# we're going to have to generate a name based on who's in the room,
# so find out who is in the room that isn't the user.
if EventTypes.Member in room_state_bytype_ids:
@@ -120,8 +120,8 @@ async def calculate_room_name(
all_members = [
ev
for ev in member_events.values()
if ev.content["membership"] == "join"
or ev.content["membership"] == "invite"
if ev.content.get("membership") == Membership.JOIN
or ev.content.get("membership") == Membership.INVITE
]
# Sort the member events oldest-first so the we name people in the
# order the joined (it should at least be deterministic rather than
@@ -194,11 +194,7 @@ def descriptor_from_member_events(member_events: Iterable[EventBase]) -> str:
def name_from_member_event(member_event: EventBase) -> str:
if (
member_event.content
and "displayname" in member_event.content
and member_event.content["displayname"]
):
if member_event.content and member_event.content.get("displayname"):
return member_event.content["displayname"]
return member_event.state_key

View File

@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2020, 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -36,6 +38,7 @@ from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_medi
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
from synapse.rest.admin.rooms import (
DeleteRoomRestServlet,
ForwardExtremitiesRestServlet,
JoinRoomAliasServlet,
ListRoomRestServlet,
MakeRoomAdminRestServlet,
@@ -230,6 +233,7 @@ def register_servlets(hs, http_server):
EventReportsRestServlet(hs).register(http_server)
PushersRestServlet(hs).register(http_server)
MakeRoomAdminRestServlet(hs).register(http_server)
ForwardExtremitiesRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(hs, http_server):

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -498,3 +498,60 @@ class MakeRoomAdminRestServlet(RestServlet):
)
return 200, {}
class ForwardExtremitiesRestServlet(RestServlet):
"""Allows a server admin to get or clear forward extremities.
Clearing does not require restarting the server.
Clear forward extremities:
DELETE /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
Get forward_extremities:
GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities
"""
PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/forward_extremities")
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.room_member_handler = hs.get_room_member_handler()
self.store = hs.get_datastore()
async def resolve_room_id(self, room_identifier: str) -> str:
"""Resolve to a room ID, if necessary."""
if RoomID.is_valid(room_identifier):
resolved_room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias)
resolved_room_id = room_id.to_string()
else:
raise SynapseError(
400, "%s was not legal room ID or room alias" % (room_identifier,)
)
if not resolved_room_id:
raise SynapseError(
400, "Unknown room ID or room alias %s" % room_identifier
)
return resolved_room_id
async def on_DELETE(self, request, room_identifier):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
room_id = await self.resolve_room_id(room_identifier)
deleted_count = await self.store.delete_forward_extremities_for_room(room_id)
return 200, {"deleted": deleted_count}
async def on_GET(self, request, room_identifier):
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
room_id = await self.resolve_room_id(room_identifier)
extremities = await self.store.get_forward_extremities_for_room(room_id)
return 200, {"count": len(extremities), "results": extremities}

View File

@@ -300,6 +300,7 @@ class FileInfo:
thumbnail_height (int)
thumbnail_method (str)
thumbnail_type (str): Content type of thumbnail, e.g. image/png
thumbnail_length (int): The size of the media file, in bytes.
"""
def __init__(
@@ -312,6 +313,7 @@ class FileInfo:
thumbnail_height=None,
thumbnail_method=None,
thumbnail_type=None,
thumbnail_length=None,
):
self.server_name = server_name
self.file_id = file_id
@@ -321,6 +323,7 @@ class FileInfo:
self.thumbnail_height = thumbnail_height
self.thumbnail_method = thumbnail_method
self.thumbnail_type = thumbnail_type
self.thumbnail_length = thumbnail_length
def get_filename_from_headers(headers: Dict[bytes, List[bytes]]) -> Optional[str]:

View File

@@ -16,7 +16,7 @@
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from twisted.web.http import Request
@@ -106,31 +106,17 @@ class ThumbnailResource(DirectServeJsonResource):
return
thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
if thumbnail_infos:
thumbnail_info = self._select_thumbnail(
width, height, method, m_type, thumbnail_infos
)
file_info = FileInfo(
server_name=None,
file_id=media_id,
url_cache=media_info["url_cache"],
thumbnail=True,
thumbnail_width=thumbnail_info["thumbnail_width"],
thumbnail_height=thumbnail_info["thumbnail_height"],
thumbnail_type=thumbnail_info["thumbnail_type"],
thumbnail_method=thumbnail_info["thumbnail_method"],
)
t_type = file_info.thumbnail_type
t_length = thumbnail_info["thumbnail_length"]
responder = await self.media_storage.fetch_media(file_info)
await respond_with_responder(request, responder, t_type, t_length)
else:
logger.info("Couldn't find any generated thumbnails")
respond_404(request)
await self._select_and_respond_with_thumbnail(
request,
width,
height,
method,
m_type,
thumbnail_infos,
media_id,
url_cache=media_info["url_cache"],
server_name=None,
)
async def _select_or_generate_local_thumbnail(
self,
@@ -276,26 +262,64 @@ class ThumbnailResource(DirectServeJsonResource):
thumbnail_infos = await self.store.get_remote_media_thumbnails(
server_name, media_id
)
await self._select_and_respond_with_thumbnail(
request,
width,
height,
method,
m_type,
thumbnail_infos,
media_info["filesystem_id"],
url_cache=None,
server_name=server_name,
)
async def _select_and_respond_with_thumbnail(
self,
request: Request,
desired_width: int,
desired_height: int,
desired_method: str,
desired_type: str,
thumbnail_infos: List[Dict[str, Any]],
file_id: str,
url_cache: Optional[str] = None,
server_name: Optional[str] = None,
) -> None:
"""
Respond to a request with an appropriate thumbnail from the previously generated thumbnails.
Args:
request: The incoming request.
desired_width: The desired width, the returned thumbnail may be larger than this.
desired_height: The desired height, the returned thumbnail may be larger than this.
desired_method: The desired method used to generate the thumbnail.
desired_type: The desired content-type of the thumbnail.
thumbnail_infos: A list of dictionaries of candidate thumbnails.
file_id: The ID of the media that a thumbnail is being requested for.
url_cache: The URL cache value.
server_name: The server name, if this is a remote thumbnail.
"""
if thumbnail_infos:
thumbnail_info = self._select_thumbnail(
width, height, method, m_type, thumbnail_infos
file_info = self._select_thumbnail(
desired_width,
desired_height,
desired_method,
desired_type,
thumbnail_infos,
file_id,
url_cache,
server_name,
)
file_info = FileInfo(
server_name=server_name,
file_id=media_info["filesystem_id"],
thumbnail=True,
thumbnail_width=thumbnail_info["thumbnail_width"],
thumbnail_height=thumbnail_info["thumbnail_height"],
thumbnail_type=thumbnail_info["thumbnail_type"],
thumbnail_method=thumbnail_info["thumbnail_method"],
)
t_type = file_info.thumbnail_type
t_length = thumbnail_info["thumbnail_length"]
if not file_info:
logger.info("Couldn't find a thumbnail matching the desired inputs")
respond_404(request)
return
responder = await self.media_storage.fetch_media(file_info)
await respond_with_responder(request, responder, t_type, t_length)
await respond_with_responder(
request, responder, file_info.thumbnail_type, file_info.thumbnail_length
)
else:
logger.info("Failed to find any generated thumbnails")
respond_404(request)
@@ -306,67 +330,117 @@ class ThumbnailResource(DirectServeJsonResource):
desired_height: int,
desired_method: str,
desired_type: str,
thumbnail_infos,
) -> dict:
thumbnail_infos: List[Dict[str, Any]],
file_id: str,
url_cache: Optional[str],
server_name: Optional[str],
) -> Optional[FileInfo]:
"""
Choose an appropriate thumbnail from the previously generated thumbnails.
Args:
desired_width: The desired width, the returned thumbnail may be larger than this.
desired_height: The desired height, the returned thumbnail may be larger than this.
desired_method: The desired method used to generate the thumbnail.
desired_type: The desired content-type of the thumbnail.
thumbnail_infos: A list of dictionaries of candidate thumbnails.
file_id: The ID of the media that a thumbnail is being requested for.
url_cache: The URL cache value.
server_name: The server name, if this is a remote thumbnail.
Returns:
The thumbnail which best matches the desired parameters.
"""
desired_method = desired_method.lower()
# The chosen thumbnail.
thumbnail_info = None
d_w = desired_width
d_h = desired_height
if desired_method.lower() == "crop":
if desired_method == "crop":
# Thumbnails that match equal or larger sizes of desired width/height.
crop_info_list = []
# Other thumbnails.
crop_info_list2 = []
for info in thumbnail_infos:
# Skip thumbnails generated with different methods.
if info["thumbnail_method"] != "crop":
continue
t_w = info["thumbnail_width"]
t_h = info["thumbnail_height"]
t_method = info["thumbnail_method"]
if t_method == "crop":
aspect_quality = abs(d_w * t_h - d_h * t_w)
min_quality = 0 if d_w <= t_w and d_h <= t_h else 1
size_quality = abs((d_w - t_w) * (d_h - t_h))
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
if t_w >= d_w or t_h >= d_h:
crop_info_list.append(
(
aspect_quality,
min_quality,
size_quality,
type_quality,
length_quality,
info,
)
)
else:
crop_info_list2.append(
(
aspect_quality,
min_quality,
size_quality,
type_quality,
length_quality,
info,
)
)
if crop_info_list:
return min(crop_info_list)[-1]
else:
return min(crop_info_list2)[-1]
else:
info_list = []
info_list2 = []
for info in thumbnail_infos:
t_w = info["thumbnail_width"]
t_h = info["thumbnail_height"]
t_method = info["thumbnail_method"]
aspect_quality = abs(d_w * t_h - d_h * t_w)
min_quality = 0 if d_w <= t_w and d_h <= t_h else 1
size_quality = abs((d_w - t_w) * (d_h - t_h))
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
if t_method == "scale" and (t_w >= d_w or t_h >= d_h):
if t_w >= d_w or t_h >= d_h:
crop_info_list.append(
(
aspect_quality,
min_quality,
size_quality,
type_quality,
length_quality,
info,
)
)
else:
crop_info_list2.append(
(
aspect_quality,
min_quality,
size_quality,
type_quality,
length_quality,
info,
)
)
if crop_info_list:
thumbnail_info = min(crop_info_list)[-1]
elif crop_info_list2:
thumbnail_info = min(crop_info_list2)[-1]
elif desired_method == "scale":
# Thumbnails that match equal or larger sizes of desired width/height.
info_list = []
# Other thumbnails.
info_list2 = []
for info in thumbnail_infos:
# Skip thumbnails generated with different methods.
if info["thumbnail_method"] != "scale":
continue
t_w = info["thumbnail_width"]
t_h = info["thumbnail_height"]
size_quality = abs((d_w - t_w) * (d_h - t_h))
type_quality = desired_type != info["thumbnail_type"]
length_quality = info["thumbnail_length"]
if t_w >= d_w or t_h >= d_h:
info_list.append((size_quality, type_quality, length_quality, info))
elif t_method == "scale":
else:
info_list2.append(
(size_quality, type_quality, length_quality, info)
)
if info_list:
return min(info_list)[-1]
else:
return min(info_list2)[-1]
thumbnail_info = min(info_list)[-1]
elif info_list2:
thumbnail_info = min(info_list2)[-1]
if thumbnail_info:
return FileInfo(
file_id=file_id,
url_cache=url_cache,
server_name=server_name,
thumbnail=True,
thumbnail_width=thumbnail_info["thumbnail_width"],
thumbnail_height=thumbnail_info["thumbnail_height"],
thumbnail_type=thumbnail_info["thumbnail_type"],
thumbnail_method=thumbnail_info["thumbnail_method"],
thumbnail_length=thumbnail_info["thumbnail_length"],
)
# No matching thumbnail was found.
return None

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore
from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
@@ -118,6 +119,7 @@ class DataStore(
UIAuthStore,
CacheInvalidationWorkerStore,
ServerMetricsStore,
EventForwardExtremitiesStore,
):
def __init__(self, database: DatabasePool, db_conn, hs):
self.hs = hs

View File

@@ -473,8 +473,9 @@ class PersistEventsStore:
txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
)
@staticmethod
@classmethod
def _add_chain_cover_index(
cls,
txn,
db_pool: DatabasePool,
event_to_room_id: Dict[str, str],
@@ -614,60 +615,17 @@ class PersistEventsStore:
if not events_to_calc_chain_id_for:
return
# We now calculate the chain IDs/sequence numbers for the events. We
# do this by looking at the chain ID and sequence number of any auth
# event with the same type/state_key and incrementing the sequence
# number by one. If there was no match or the chain ID/sequence
# number is already taken we generate a new chain.
#
# We need to do this in a topologically sorted order as we want to
# generate chain IDs/sequence numbers of an event's auth events
# before the event itself.
chains_tuples_allocated = set() # type: Set[Tuple[int, int]]
new_chain_tuples = {} # type: Dict[str, Tuple[int, int]]
for event_id in sorted_topologically(
events_to_calc_chain_id_for, event_to_auth_chain
):
existing_chain_id = None
for auth_id in event_to_auth_chain.get(event_id, []):
if event_to_types.get(event_id) == event_to_types.get(auth_id):
existing_chain_id = chain_map[auth_id]
break
new_chain_tuple = None
if existing_chain_id:
# We found a chain ID/sequence number candidate, check its
# not already taken.
proposed_new_id = existing_chain_id[0]
proposed_new_seq = existing_chain_id[1] + 1
if (proposed_new_id, proposed_new_seq) not in chains_tuples_allocated:
already_allocated = db_pool.simple_select_one_onecol_txn(
txn,
table="event_auth_chains",
keyvalues={
"chain_id": proposed_new_id,
"sequence_number": proposed_new_seq,
},
retcol="event_id",
allow_none=True,
)
if already_allocated:
# Mark it as already allocated so we don't need to hit
# the DB again.
chains_tuples_allocated.add((proposed_new_id, proposed_new_seq))
else:
new_chain_tuple = (
proposed_new_id,
proposed_new_seq,
)
if not new_chain_tuple:
new_chain_tuple = (db_pool.event_chain_id_gen.get_next_id_txn(txn), 1)
chains_tuples_allocated.add(new_chain_tuple)
chain_map[event_id] = new_chain_tuple
new_chain_tuples[event_id] = new_chain_tuple
# Allocate chain ID/sequence numbers to each new event.
new_chain_tuples = cls._allocate_chain_ids(
txn,
db_pool,
event_to_room_id,
event_to_types,
event_to_auth_chain,
events_to_calc_chain_id_for,
chain_map,
)
chain_map.update(new_chain_tuples)
db_pool.simple_insert_many_txn(
txn,
@@ -794,6 +752,137 @@ class PersistEventsStore:
],
)
@staticmethod
def _allocate_chain_ids(
txn,
db_pool: DatabasePool,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
events_to_calc_chain_id_for: Set[str],
chain_map: Dict[str, Tuple[int, int]],
) -> Dict[str, Tuple[int, int]]:
"""Allocates, but does not persist, chain ID/sequence numbers for the
events in `events_to_calc_chain_id_for`. (c.f. _add_chain_cover_index
for info on args)
"""
# We now calculate the chain IDs/sequence numbers for the events. We do
# this by looking at the chain ID and sequence number of any auth event
# with the same type/state_key and incrementing the sequence number by
# one. If there was no match or the chain ID/sequence number is already
# taken we generate a new chain.
#
# We try to reduce the number of times that we hit the database by
# batching up calls, to make this more efficient when persisting large
# numbers of state events (e.g. during joins).
#
# We do this by:
# 1. Calculating for each event which auth event will be used to
# inherit the chain ID, i.e. converting the auth chain graph to a
# tree that we can allocate chains on. We also keep track of which
# existing chain IDs have been referenced.
# 2. Fetching the max allocated sequence number for each referenced
# existing chain ID, generating a map from chain ID to the max
# allocated sequence number.
# 3. Iterating over the tree and allocating a chain ID/seq no. to the
# new event, by incrementing the sequence number from the
# referenced event's chain ID/seq no. and checking that the
# incremented sequence number hasn't already been allocated (by
# looking in the map generated in the previous step). We generate a
# new chain if the sequence number has already been allocated.
#
existing_chains = set() # type: Set[int]
tree = [] # type: List[Tuple[str, Optional[str]]]
# We need to do this in a topologically sorted order as we want to
# generate chain IDs/sequence numbers of an event's auth events before
# the event itself.
for event_id in sorted_topologically(
events_to_calc_chain_id_for, event_to_auth_chain
):
for auth_id in event_to_auth_chain.get(event_id, []):
if event_to_types.get(event_id) == event_to_types.get(auth_id):
existing_chain_id = chain_map.get(auth_id)
if existing_chain_id:
existing_chains.add(existing_chain_id[0])
tree.append((event_id, auth_id))
break
else:
tree.append((event_id, None))
# Fetch the current max sequence number for each existing referenced chain.
sql = """
SELECT chain_id, MAX(sequence_number) FROM event_auth_chains
WHERE %s
GROUP BY chain_id
"""
clause, args = make_in_list_sql_clause(
db_pool.engine, "chain_id", existing_chains
)
txn.execute(sql % (clause,), args)
chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int]
# Allocate the new events chain ID/sequence numbers.
#
# To reduce the number of calls to the database we don't allocate a
# chain ID number in the loop, instead we use a temporary `object()` for
# each new chain ID. Once we've done the loop we generate the necessary
# number of new chain IDs in one call, replacing all temporary
# objects with real allocated chain IDs.
unallocated_chain_ids = set() # type: Set[object]
new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]]
for event_id, auth_event_id in tree:
# If we reference an auth_event_id we fetch the allocated chain ID,
# either from the existing `chain_map` or the newly generated
# `new_chain_tuples` map.
existing_chain_id = None
if auth_event_id:
existing_chain_id = new_chain_tuples.get(auth_event_id)
if not existing_chain_id:
existing_chain_id = chain_map[auth_event_id]
new_chain_tuple = None # type: Optional[Tuple[Any, int]]
if existing_chain_id:
# We found a chain ID/sequence number candidate, check its
# not already taken.
proposed_new_id = existing_chain_id[0]
proposed_new_seq = existing_chain_id[1] + 1
if chain_to_max_seq_no[proposed_new_id] < proposed_new_seq:
new_chain_tuple = (
proposed_new_id,
proposed_new_seq,
)
# If we need to start a new chain we allocate a temporary chain ID.
if not new_chain_tuple:
new_chain_tuple = (object(), 1)
unallocated_chain_ids.add(new_chain_tuple[0])
new_chain_tuples[event_id] = new_chain_tuple
chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]
# Generate new chain IDs for all unallocated chain IDs.
newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
txn, len(unallocated_chain_ids)
)
# Map from potentially temporary chain ID to real chain ID
chain_id_to_allocated_map = dict(
zip(unallocated_chain_ids, newly_allocated_chain_ids)
) # type: Dict[Any, int]
chain_id_to_allocated_map.update((c, c) for c in existing_chains)
return {
event_id: (chain_id_to_allocated_map[chain_id], seq)
for event_id, (chain_id, seq) in new_chain_tuples.items()
}
def _persist_transaction_ids_txn(
self,
txn: LoggingTransaction,

View File

@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Dict, List
from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
logger = logging.getLogger(__name__)
class EventForwardExtremitiesStore(SQLBaseStore):
async def delete_forward_extremities_for_room(self, room_id: str) -> int:
"""Delete any extra forward extremities for a room.
Invalidates the "get_latest_event_ids_in_room" cache if any forward
extremities were deleted.
Returns count deleted.
"""
def delete_forward_extremities_for_room_txn(txn):
# First we need to get the event_id to not delete
sql = """
SELECT event_id FROM event_forward_extremities
INNER JOIN events USING (room_id, event_id)
WHERE room_id = ?
ORDER BY stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (room_id,))
rows = txn.fetchall()
try:
event_id = rows[0][0]
logger.debug(
"Found event_id %s as the forward extremity to keep for room %s",
event_id,
room_id,
)
except KeyError:
msg = "No forward extremity event found for room %s" % room_id
logger.warning(msg)
raise SynapseError(400, msg)
# Now delete the extra forward extremities
sql = """
DELETE FROM event_forward_extremities
WHERE event_id != ? AND room_id = ?
"""
txn.execute(sql, (event_id, room_id))
logger.info(
"Deleted %s extra forward extremities for room %s",
txn.rowcount,
room_id,
)
if txn.rowcount > 0:
# Invalidate the cache
self._invalidate_cache_and_stream(
txn, self.get_latest_event_ids_in_room, (room_id,),
)
return txn.rowcount
return await self.db_pool.runInteraction(
"delete_forward_extremities_for_room",
delete_forward_extremities_for_room_txn,
)
async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]:
"""Get list of forward extremities for a room."""
def get_forward_extremities_for_room_txn(txn):
sql = """
SELECT event_id, state_group, depth, received_ts
FROM event_forward_extremities
NATURAL JOIN event_to_state_groups
NATURAL JOIN events
WHERE room_id = ?
"""
txn.execute(sql, (room_id,))
return self.db_pool.cursor_to_dict(txn)
return await self.db_pool.runInteraction(
"get_forward_extremities_for_room", get_forward_extremities_for_room_txn,
)

View File

@@ -15,12 +15,11 @@
import heapq
import logging
import threading
from collections import deque
from collections import OrderedDict
from contextlib import contextmanager
from typing import Dict, List, Optional, Set, Tuple, Union
import attr
from typing_extensions import Deque
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
@@ -101,7 +100,13 @@ class StreamIdGenerator:
self._current = (max if step > 0 else min)(
self._current, _load_current_id(db_conn, table, column, step)
)
self._unfinished_ids = deque() # type: Deque[int]
# We use this as an ordered set, as we want to efficiently append items,
# remove items and get the first item. Since we insert IDs in order, the
# insertion ordering will ensure its in the correct ordering.
#
# The key and values are the same, but we never look at the values.
self._unfinished_ids = OrderedDict() # type: OrderedDict[int, int]
def get_next(self):
"""
@@ -113,7 +118,7 @@ class StreamIdGenerator:
self._current += self._step
next_id = self._current
self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id
@contextmanager
def manager():
@@ -121,7 +126,7 @@ class StreamIdGenerator:
yield next_id
finally:
with self._lock:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)
return _AsyncCtxManagerWrapper(manager())
@@ -140,7 +145,7 @@ class StreamIdGenerator:
self._current += n * self._step
for next_id in next_ids:
self._unfinished_ids.append(next_id)
self._unfinished_ids[next_id] = next_id
@contextmanager
def manager():
@@ -149,7 +154,7 @@ class StreamIdGenerator:
finally:
with self._lock:
for next_id in next_ids:
self._unfinished_ids.remove(next_id)
self._unfinished_ids.pop(next_id)
return _AsyncCtxManagerWrapper(manager())
@@ -162,7 +167,7 @@ class StreamIdGenerator:
"""
with self._lock:
if self._unfinished_ids:
return self._unfinished_ids[0] - self._step
return next(iter(self._unfinished_ids)) - self._step
return self._current

View File

@@ -69,6 +69,11 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
"""Gets the next ID in the sequence"""
...
@abc.abstractmethod
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
"""Get the next `n` IDs in the sequence"""
...
@abc.abstractmethod
def check_consistency(
self,
@@ -219,6 +224,17 @@ class LocalSequenceGenerator(SequenceGenerator):
self._current_max_id += 1
return self._current_max_id
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None
first_id = self._current_max_id + 1
self._current_max_id += n
return [first_id + i for i in range(n)]
def check_consistency(
self,
db_conn: Connection,

View File

@@ -0,0 +1,229 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, Optional, Tuple
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent
from synapse.push.presentable_names import calculate_room_name
from synapse.types import StateKey, StateMap
from tests import unittest
class MockDataStore:
"""
A fake data store which stores a mapping of state key to event content.
(I.e. the state key is used as the event ID.)
"""
def __init__(self, events: Iterable[Tuple[StateKey, dict]]):
"""
Args:
events: A state map to event contents.
"""
self._events = {}
for i, (event_id, content) in enumerate(events):
self._events[event_id] = FrozenEvent(
{
"event_id": "$event_id",
"type": event_id[0],
"sender": "@user:test",
"state_key": event_id[1],
"room_id": "#room:test",
"content": content,
"origin_server_ts": i,
},
RoomVersions.V1,
)
async def get_event(
self, event_id: StateKey, allow_none: bool = False
) -> Optional[FrozenEvent]:
assert allow_none, "Mock not configured for allow_none = False"
return self._events.get(event_id)
async def get_events(self, event_ids: Iterable[StateKey]):
# This is cheating since it just returns all events.
return self._events
class PresentableNamesTestCase(unittest.HomeserverTestCase):
USER_ID = "@test:test"
OTHER_USER_ID = "@user:test"
def _calculate_room_name(
self,
events: StateMap[dict],
user_id: str = "",
fallback_to_members: bool = True,
fallback_to_single_member: bool = True,
):
# This isn't 100% accurate, but works with MockDataStore.
room_state_ids = {k[0]: k[0] for k in events}
return self.get_success(
calculate_room_name(
MockDataStore(events),
room_state_ids,
user_id or self.USER_ID,
fallback_to_members,
fallback_to_single_member,
)
)
def test_name(self):
"""A room name event should be used."""
events = [
((EventTypes.Name, ""), {"name": "test-name"}),
]
self.assertEqual("test-name", self._calculate_room_name(events))
# Check if the event content has garbage.
events = [((EventTypes.Name, ""), {"foo": 1})]
self.assertEqual("Empty Room", self._calculate_room_name(events))
events = [((EventTypes.Name, ""), {"name": 1})]
self.assertEqual(1, self._calculate_room_name(events))
def test_canonical_alias(self):
"""An canonical alias should be used."""
events = [
((EventTypes.CanonicalAlias, ""), {"alias": "#test-name:test"}),
]
self.assertEqual("#test-name:test", self._calculate_room_name(events))
# Check if the event content has garbage.
events = [((EventTypes.CanonicalAlias, ""), {"foo": 1})]
self.assertEqual("Empty Room", self._calculate_room_name(events))
events = [((EventTypes.CanonicalAlias, ""), {"alias": "test-name"})]
self.assertEqual("Empty Room", self._calculate_room_name(events))
def test_invite(self):
"""An invite has special behaviour."""
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
((EventTypes.Member, self.OTHER_USER_ID), {"displayname": "Other User"}),
]
self.assertEqual("Invite from Other User", self._calculate_room_name(events))
self.assertIsNone(
self._calculate_room_name(events, fallback_to_single_member=False)
)
# Ensure this logic is skipped if we don't fallback to members.
self.assertIsNone(self._calculate_room_name(events, fallback_to_members=False))
# Check if the event content has garbage.
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
((EventTypes.Member, self.OTHER_USER_ID), {"foo": 1}),
]
self.assertEqual("Invite from @user:test", self._calculate_room_name(events))
# No member event for sender.
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
]
self.assertEqual("Room Invite", self._calculate_room_name(events))
def test_no_members(self):
"""Behaviour of an empty room."""
events = []
self.assertEqual("Empty Room", self._calculate_room_name(events))
# Note that events with invalid (or missing) membership are ignored.
events = [
((EventTypes.Member, self.OTHER_USER_ID), {"foo": 1}),
((EventTypes.Member, "@foo:test"), {"membership": "foo"}),
]
self.assertEqual("Empty Room", self._calculate_room_name(events))
def test_no_other_members(self):
"""Behaviour of a room with no other members in it."""
events = [
(
(EventTypes.Member, self.USER_ID),
{"membership": Membership.JOIN, "displayname": "Me"},
),
]
self.assertEqual("Me", self._calculate_room_name(events))
# Check if the event content has no displayname.
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
]
self.assertEqual("@test:test", self._calculate_room_name(events))
# 3pid invite, use the other user (who is set as the sender).
events = [
((EventTypes.Member, self.OTHER_USER_ID), {"membership": Membership.JOIN}),
]
self.assertEqual(
"nobody", self._calculate_room_name(events, user_id=self.OTHER_USER_ID)
)
events = [
((EventTypes.Member, self.OTHER_USER_ID), {"membership": Membership.JOIN}),
((EventTypes.ThirdPartyInvite, self.OTHER_USER_ID), {}),
]
self.assertEqual(
"Inviting email address",
self._calculate_room_name(events, user_id=self.OTHER_USER_ID),
)
def test_one_other_member(self):
"""Behaviour of a room with a single other member."""
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
(
(EventTypes.Member, self.OTHER_USER_ID),
{"membership": Membership.JOIN, "displayname": "Other User"},
),
]
self.assertEqual("Other User", self._calculate_room_name(events))
self.assertIsNone(
self._calculate_room_name(events, fallback_to_single_member=False)
)
# Check if the event content has no displayname and is an invite.
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
(
(EventTypes.Member, self.OTHER_USER_ID),
{"membership": Membership.INVITE},
),
]
self.assertEqual("@user:test", self._calculate_room_name(events))
def test_other_members(self):
"""Behaviour of a room with multiple other members."""
# Two other members.
events = [
((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
(
(EventTypes.Member, self.OTHER_USER_ID),
{"membership": Membership.JOIN, "displayname": "Other User"},
),
((EventTypes.Member, "@foo:test"), {"membership": Membership.JOIN}),
]
self.assertEqual("Other User and @foo:test", self._calculate_room_name(events))
# Three or more other members.
events.append(
((EventTypes.Member, "@fourth:test"), {"membership": Membership.INVITE})
)
self.assertEqual("Other User and 2 others", self._calculate_room_name(events))

View File

@@ -29,7 +29,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
"type": "m.room.history_visibility",
"sender": "@user:test",
"state_key": "",
"room_id": "@room:test",
"room_id": "#room:test",
"content": content,
},
RoomVersions.V1,

View File

@@ -202,7 +202,6 @@ class MediaRepoTests(unittest.HomeserverTestCase):
config = self.default_config()
config["media_store_path"] = self.media_store_path
config["thumbnail_requirements"] = {}
config["max_image_pixels"] = 2000000
provider_config = {
@@ -313,15 +312,39 @@ class MediaRepoTests(unittest.HomeserverTestCase):
self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), None)
def test_thumbnail_crop(self):
"""Test that a cropped remote thumbnail is available."""
self._test_thumbnail(
"crop", self.test_image.expected_cropped, self.test_image.expected_found
)
def test_thumbnail_scale(self):
"""Test that a scaled remote thumbnail is available."""
self._test_thumbnail(
"scale", self.test_image.expected_scaled, self.test_image.expected_found
)
def test_invalid_type(self):
"""An invalid thumbnail type is never available."""
self._test_thumbnail("invalid", None, False)
@unittest.override_config(
{"thumbnail_sizes": [{"width": 32, "height": 32, "method": "scale"}]}
)
def test_no_thumbnail_crop(self):
"""
Override the config to generate only scaled thumbnails, but request a cropped one.
"""
self._test_thumbnail("crop", None, False)
@unittest.override_config(
{"thumbnail_sizes": [{"width": 32, "height": 32, "method": "crop"}]}
)
def test_no_thumbnail_scale(self):
"""
Override the config to generate only cropped thumbnails, but request a scaled one.
"""
self._test_thumbnail("scale", None, False)
def _test_thumbnail(self, method, expected_body, expected_found):
params = "?width=32&height=32&method=" + method
channel = make_request(

View File

@@ -26,7 +26,8 @@ deps =
pip>=10 ; python_version >= '3.6'
pip>=10,<21.0 ; python_version < '3.6'
# directories/files we run the linters on
# directories/files we run the linters on.
# if you update this list, make sure to do the same in scripts-dev/lint.sh
lint_targets =
setup.py
synapse