Merge commit '8238b55e0' into anoa/dinsic_release_1_21_x
* commit '8238b55e0': Update description of server_name config option (#8415) Discard an empty upload_name before persisting an uploaded file (#7905) Don't table scan events on worker startup (#8419) Mypy fixes for `synapse.handlers.federation` (#8422)
This commit is contained in:
1
changelog.d/7905.bugfix
Normal file
1
changelog.d/7905.bugfix
Normal file
@@ -0,0 +1 @@
|
||||
Fix a longstanding bug when storing a media file with an empty `upload_name`.
|
||||
1
changelog.d/8415.doc
Normal file
1
changelog.d/8415.doc
Normal file
@@ -0,0 +1 @@
|
||||
Improve description of `server_name` config option in `homserver.yaml`.
|
||||
1
changelog.d/8419.feature
Normal file
1
changelog.d/8419.feature
Normal file
@@ -0,0 +1 @@
|
||||
Add experimental support for sharding event persister.
|
||||
1
changelog.d/8422.misc
Normal file
1
changelog.d/8422.misc
Normal file
@@ -0,0 +1 @@
|
||||
Typing fixes for `synapse.handlers.federation`.
|
||||
@@ -33,10 +33,23 @@
|
||||
|
||||
## Server ##
|
||||
|
||||
# The domain name of the server, with optional explicit port.
|
||||
# This is used by remote servers to connect to this server,
|
||||
# e.g. matrix.org, localhost:8080, etc.
|
||||
# This is also the last part of your UserID.
|
||||
# The public-facing domain of the server
|
||||
#
|
||||
# The server_name name will appear at the end of usernames and room addresses
|
||||
# created on this server. For example if the server_name was example.com,
|
||||
# usernames on this server would be in the format @user:example.com
|
||||
#
|
||||
# In most cases you should avoid using a matrix specific subdomain such as
|
||||
# matrix.example.com or synapse.example.com as the server_name for the same
|
||||
# reasons you wouldn't use user@email.example.com as your email address.
|
||||
# See https://github.com/matrix-org/synapse/blob/master/docs/delegate.md
|
||||
# for information on how to host Synapse on a subdomain while preserving
|
||||
# a clean server_name.
|
||||
#
|
||||
# The server_name cannot be changed later so it is important to
|
||||
# configure this correctly before you start Synapse. It should be all
|
||||
# lowercase and may contain an explicit port.
|
||||
# Examples: matrix.org, localhost:8080
|
||||
#
|
||||
server_name: "SERVERNAME"
|
||||
|
||||
|
||||
@@ -647,10 +647,23 @@ class ServerConfig(Config):
|
||||
"""\
|
||||
## Server ##
|
||||
|
||||
# The domain name of the server, with optional explicit port.
|
||||
# This is used by remote servers to connect to this server,
|
||||
# e.g. matrix.org, localhost:8080, etc.
|
||||
# This is also the last part of your UserID.
|
||||
# The public-facing domain of the server
|
||||
#
|
||||
# The server_name name will appear at the end of usernames and room addresses
|
||||
# created on this server. For example if the server_name was example.com,
|
||||
# usernames on this server would be in the format @user:example.com
|
||||
#
|
||||
# In most cases you should avoid using a matrix specific subdomain such as
|
||||
# matrix.example.com or synapse.example.com as the server_name for the same
|
||||
# reasons you wouldn't use user@email.example.com as your email address.
|
||||
# See https://github.com/matrix-org/synapse/blob/master/docs/delegate.md
|
||||
# for information on how to host Synapse on a subdomain while preserving
|
||||
# a clean server_name.
|
||||
#
|
||||
# The server_name cannot be changed later so it is important to
|
||||
# configure this correctly before you start Synapse. It should be all
|
||||
# lowercase and may contain an explicit port.
|
||||
# Examples: matrix.org, localhost:8080
|
||||
#
|
||||
server_name: "%(server_name)s"
|
||||
|
||||
|
||||
@@ -24,10 +24,12 @@ from typing import (
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Sequence,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
from prometheus_client import Counter
|
||||
@@ -501,7 +503,7 @@ class FederationClient(FederationBase):
|
||||
user_id: str,
|
||||
membership: str,
|
||||
content: dict,
|
||||
params: Dict[str, str],
|
||||
params: Optional[Mapping[str, Union[str, Iterable[str]]]],
|
||||
) -> Tuple[str, EventBase, RoomVersion]:
|
||||
"""
|
||||
Creates an m.room.member event, with context, without participating in the room.
|
||||
|
||||
@@ -155,8 +155,9 @@ class FederationHandler(BaseHandler):
|
||||
self._device_list_updater = hs.get_device_handler().device_list_updater
|
||||
self._maybe_store_room_on_invite = self.store.maybe_store_room_on_invite
|
||||
|
||||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
# When joining a room we need to queue any events for that room up.
|
||||
# For each room, a list of (pdu, origin) tuples.
|
||||
self.room_queues = {} # type: Dict[str, List[Tuple[EventBase, str]]]
|
||||
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
||||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
@@ -817,6 +818,9 @@ class FederationHandler(BaseHandler):
|
||||
dest, room_id, limit=limit, extremities=extremities
|
||||
)
|
||||
|
||||
if not events:
|
||||
return []
|
||||
|
||||
# ideally we'd sanity check the events here for excess prev_events etc,
|
||||
# but it's hard to reject events at this point without completely
|
||||
# breaking backfill in the same way that it is currently broken by
|
||||
@@ -2174,10 +2178,10 @@ class FederationHandler(BaseHandler):
|
||||
# given state at the event. This should correctly handle cases
|
||||
# like bans, especially with state res v2.
|
||||
|
||||
state_sets = await self.state_store.get_state_groups(
|
||||
state_sets_d = await self.state_store.get_state_groups(
|
||||
event.room_id, extrem_ids
|
||||
)
|
||||
state_sets = list(state_sets.values())
|
||||
state_sets = list(state_sets_d.values()) # type: List[Iterable[EventBase]]
|
||||
state_sets.append(state)
|
||||
current_states = await self.state_handler.resolve_events(
|
||||
room_version, state_sets, event
|
||||
@@ -2968,6 +2972,7 @@ class FederationHandler(BaseHandler):
|
||||
)
|
||||
return result["max_stream_id"]
|
||||
else:
|
||||
assert self.storage.persistence
|
||||
max_stream_token = await self.storage.persistence.persist_events(
|
||||
event_and_contexts, backfilled=backfilled
|
||||
)
|
||||
|
||||
@@ -139,7 +139,7 @@ class MediaRepository:
|
||||
async def create_content(
|
||||
self,
|
||||
media_type: str,
|
||||
upload_name: str,
|
||||
upload_name: Optional[str],
|
||||
content: IO,
|
||||
content_length: int,
|
||||
auth_user: str,
|
||||
@@ -147,8 +147,8 @@ class MediaRepository:
|
||||
"""Store uploaded content for a local user and return the mxc URL
|
||||
|
||||
Args:
|
||||
media_type: The content type of the file
|
||||
upload_name: The name of the file
|
||||
media_type: The content type of the file.
|
||||
upload_name: The name of the file, if provided.
|
||||
content: A file like object that is the content to store
|
||||
content_length: The length of the content
|
||||
auth_user: The user_id of the uploader
|
||||
@@ -156,6 +156,7 @@ class MediaRepository:
|
||||
Returns:
|
||||
The mxc url of the stored content
|
||||
"""
|
||||
|
||||
media_id = random_string(24)
|
||||
|
||||
file_info = FileInfo(server_name=None, file_id=media_id)
|
||||
|
||||
@@ -63,6 +63,10 @@ class UploadResource(DirectServeJsonResource):
|
||||
msg="Invalid UTF-8 filename parameter: %r" % (upload_name), code=400
|
||||
)
|
||||
|
||||
# If the name is falsey (e.g. an empty byte string) ensure it is None.
|
||||
else:
|
||||
upload_name = None
|
||||
|
||||
headers = request.requestHeaders
|
||||
|
||||
if headers.hasHeader(b"Content-Type"):
|
||||
|
||||
@@ -24,7 +24,7 @@ from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStor
|
||||
from synapse.storage.state import StateFilter
|
||||
from synapse.storage.types import Cursor
|
||||
from synapse.storage.util.sequence import build_sequence_generator
|
||||
from synapse.types import StateMap
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
from synapse.util.caches.descriptors import cached
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
|
||||
@@ -208,7 +208,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
|
||||
|
||||
async def _get_state_for_groups(
|
||||
self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all()
|
||||
) -> Dict[int, StateMap[str]]:
|
||||
) -> Dict[int, MutableStateMap[str]]:
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
filtering by type/state_key
|
||||
|
||||
|
||||
@@ -197,7 +197,7 @@ class EventsPersistenceStorage:
|
||||
|
||||
async def persist_events(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
) -> RoomStreamToken:
|
||||
"""
|
||||
|
||||
@@ -20,7 +20,7 @@ import attr
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.events import EventBase
|
||||
from synapse.types import StateMap
|
||||
from synapse.types import MutableStateMap, StateMap
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -349,7 +349,7 @@ class StateGroupStorage:
|
||||
|
||||
async def get_state_groups_ids(
|
||||
self, _room_id: str, event_ids: Iterable[str]
|
||||
) -> Dict[int, StateMap[str]]:
|
||||
) -> Dict[int, MutableStateMap[str]]:
|
||||
"""Get the event IDs of all the state for the state groups for the given events
|
||||
|
||||
Args:
|
||||
@@ -532,7 +532,7 @@ class StateGroupStorage:
|
||||
|
||||
def _get_state_for_groups(
|
||||
self, groups: Iterable[int], state_filter: StateFilter = StateFilter.all()
|
||||
) -> Awaitable[Dict[int, StateMap[str]]]:
|
||||
) -> Awaitable[Dict[int, MutableStateMap[str]]]:
|
||||
"""Gets the state at each of a list of state groups, optionally
|
||||
filtering by type/state_key
|
||||
|
||||
|
||||
@@ -273,6 +273,19 @@ class MultiWriterIdGenerator:
|
||||
|
||||
# Load the current positions of all writers for the stream.
|
||||
if self._writers:
|
||||
# We delete any stale entries in the positions table. This is
|
||||
# important if we add back a writer after a long time; we want to
|
||||
# consider that a "new" writer, rather than using the old stale
|
||||
# entry here.
|
||||
sql = """
|
||||
DELETE FROM stream_positions
|
||||
WHERE
|
||||
stream_name = ?
|
||||
AND instance_name != ALL(?)
|
||||
"""
|
||||
sql = self._db.engine.convert_param_style(sql)
|
||||
cur.execute(sql, (self._stream_name, self._writers))
|
||||
|
||||
sql = """
|
||||
SELECT instance_name, stream_id FROM stream_positions
|
||||
WHERE stream_name = ?
|
||||
@@ -453,11 +466,22 @@ class MultiWriterIdGenerator:
|
||||
"""Returns the position of the given writer.
|
||||
"""
|
||||
|
||||
# If we don't have an entry for the given instance name, we assume it's a
|
||||
# new writer.
|
||||
#
|
||||
# For new writers we assume their initial position to be the current
|
||||
# persisted up to position. This stops Synapse from doing a full table
|
||||
# scan when a new writer announces itself over replication.
|
||||
with self._lock:
|
||||
return self._return_factor * self._current_positions.get(instance_name, 0)
|
||||
return self._return_factor * self._current_positions.get(
|
||||
instance_name, self._persisted_upto_position
|
||||
)
|
||||
|
||||
def get_positions(self) -> Dict[str, int]:
|
||||
"""Get a copy of the current positon map.
|
||||
|
||||
Note that this won't necessarily include all configured writers if some
|
||||
writers haven't written anything yet.
|
||||
"""
|
||||
|
||||
with self._lock:
|
||||
|
||||
@@ -390,17 +390,28 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||
# Initial config has two writers
|
||||
id_gen = self._create_id_generator("first", writers=["first", "second"])
|
||||
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
|
||||
self.assertEqual(id_gen.get_current_token_for_writer("first"), 3)
|
||||
self.assertEqual(id_gen.get_current_token_for_writer("second"), 5)
|
||||
|
||||
# New config removes one of the configs. Note that if the writer is
|
||||
# removed from config we assume that it has been shut down and has
|
||||
# finished persisting, hence why the persisted upto position is 5.
|
||||
id_gen_2 = self._create_id_generator("second", writers=["second"])
|
||||
self.assertEqual(id_gen_2.get_persisted_upto_position(), 5)
|
||||
self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5)
|
||||
|
||||
# This config points to a single, previously unused writer.
|
||||
id_gen_3 = self._create_id_generator("third", writers=["third"])
|
||||
self.assertEqual(id_gen_3.get_persisted_upto_position(), 5)
|
||||
|
||||
# For new writers we assume their initial position to be the current
|
||||
# persisted up to position. This stops Synapse from doing a full table
|
||||
# scan when a new writer comes along.
|
||||
self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5)
|
||||
|
||||
id_gen_4 = self._create_id_generator("fourth", writers=["third"])
|
||||
self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5)
|
||||
|
||||
# Check that we get a sane next stream ID with this new config.
|
||||
|
||||
async def _get_next_async():
|
||||
@@ -410,6 +421,13 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
|
||||
self.get_success(_get_next_async())
|
||||
self.assertEqual(id_gen_3.get_persisted_upto_position(), 6)
|
||||
|
||||
# If we add back the old "first" then we shouldn't see the persisted up
|
||||
# to position revert back to 3.
|
||||
id_gen_5 = self._create_id_generator("five", writers=["first", "third"])
|
||||
self.assertEqual(id_gen_5.get_persisted_upto_position(), 6)
|
||||
self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6)
|
||||
self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6)
|
||||
|
||||
def test_sequence_consistency(self):
|
||||
"""Test that we error out if the table and sequence diverges.
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user