From f807f7f80442aec48a5e1f6b6b6f1a88e707b1e2 Mon Sep 17 00:00:00 2001 From: hera Date: Thu, 12 Oct 2017 10:50:44 +0000 Subject: [PATCH 01/53] log when we get an exception handling replication updates --- synapse/replication/tcp/resource.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 3ea3ca5a6f..6c1beca4e3 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -160,7 +160,11 @@ class ReplicationStreamer(object): "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, stream.upto_token ) - updates, current_token = yield stream.get_updates() + try: + updates, current_token = yield stream.get_updates() + except: + logger.info("Failed to handle stream %s", stream.NAME) + raise logger.debug( "Sending %d updates to %d connections", From bf4fb1fb400daad23702bc0b3231ec069d68e87e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 15:20:59 +0100 Subject: [PATCH 02/53] Basic implementation of backup media store --- synapse/config/repository.py | 18 ++ synapse/rest/media/v1/media_repository.py | 219 +++++++++++----------- synapse/rest/media/v1/thumbnailer.py | 16 +- synapse/rest/media/v1/upload_resource.py | 2 +- 4 files changed, 130 insertions(+), 125 deletions(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 2c6f57168e..e3c83d56fa 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -70,7 +70,17 @@ class ContentRepositoryConfig(Config): self.max_upload_size = self.parse_size(config["max_upload_size"]) self.max_image_pixels = self.parse_size(config["max_image_pixels"]) self.max_spider_size = self.parse_size(config["max_spider_size"]) + self.media_store_path = self.ensure_directory(config["media_store_path"]) + + self.backup_media_store_path = config.get("backup_media_store_path") + if self.backup_media_store_path: + self.ensure_directory(self.backup_media_store_path) + + self.synchronous_backup_media_store = config.get( + "synchronous_backup_media_store", False + ) + self.uploads_path = self.ensure_directory(config["uploads_path"]) self.dynamic_thumbnails = config["dynamic_thumbnails"] self.thumbnail_requirements = parse_thumbnail_requirements( @@ -115,6 +125,14 @@ class ContentRepositoryConfig(Config): # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" + # A secondary directory where uploaded images and attachments are + # stored as a backup. + # backup_media_store_path: "%(media_store)s" + + # Whether to wait for successful write to backup media store before + # returning successfully. + # synchronous_backup_media_store: false + # Directory where in-progress uploads are stored. uploads_path: "%(uploads_path)s" diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 0ea1248ce6..3b442cc16b 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -33,7 +33,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \ from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination import os @@ -59,7 +59,12 @@ class MediaRepository(object): self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels + self.filepaths = MediaFilePaths(hs.config.media_store_path) + self.backup_filepaths = None + if hs.config.backup_media_store_path: + self.backup_filepaths = MediaFilePaths(hs.config.backup_media_store_path) + self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements @@ -87,18 +92,43 @@ class MediaRepository(object): if not os.path.exists(dirname): os.makedirs(dirname) + @defer.inlineCallbacks + def _write_to_file(self, source, file_name_func): + def write_file_thread(file_name): + source.seek(0) # Ensure we read from the start of the file + with open(file_name, "wb") as f: + shutil.copyfileobj(source, f) + + fname = file_name_func(self.filepaths) + self._makedirs(fname) + + # Write to the main repository + yield preserve_context_over_fn(threads.deferToThread, write_file_thread, fname) + + # Write to backup repository + if self.backup_filepaths: + backup_fname = file_name_func(backup_filepaths) + self._makedirs(backup_fname) + + # We can either wait for successful writing to the backup repository + # or write in the background and immediately return + if hs.config.synchronous_backup_media_store: + yield preserve_context_over_fn( + threads.deferToThread, write_file_thread, backup_fname, + ) + else: + preserve_fn(threads.deferToThread)(write_file, backup_fname) + + defer.returnValue(fname) + @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, auth_user): media_id = random_string(24) - fname = self.filepaths.local_media_filepath(media_id) - self._makedirs(fname) - - # This shouldn't block for very long because the content will have - # already been uploaded at this point. - with open(fname, "wb") as f: - f.write(content) + fname = yield self._write_to_file( + content, lambda f: f.local_media_filepath(media_id) + ) logger.info("Stored local media in file %r", fname) @@ -253,9 +283,8 @@ class MediaRepository(object): def _get_thumbnail_requirements(self, media_type): return self.thumbnail_requirements.get(media_type, ()) - def _generate_thumbnail(self, input_path, t_path, t_width, t_height, + def _generate_thumbnail(self, thumbnailer, t_width, t_height, t_method, t_type): - thumbnailer = Thumbnailer(input_path) m_width = thumbnailer.width m_height = thumbnailer.height @@ -267,36 +296,40 @@ class MediaRepository(object): return if t_method == "crop": - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) + t_byte_source = thumbnailer.crop(t_width, t_height, t_type) elif t_method == "scale": t_width, t_height = thumbnailer.aspect(t_width, t_height) t_width = min(m_width, t_width) t_height = min(m_height, t_height) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + t_byte_source = thumbnailer.scale(t_width, t_height, t_type) else: - t_len = None + t_byte_source = None - return t_len + return t_byte_source @defer.inlineCallbacks def generate_local_exact_thumbnail(self, media_id, t_width, t_height, t_method, t_type): input_path = self.filepaths.local_media_filepath(media_id) - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - - t_len = yield preserve_context_over_fn( + thumbnailer = Thumbnailer(input_path) + t_byte_source = yield preserve_context_over_fn( threads.deferToThread, self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type + thumbnailer, t_width, t_height, t_method, t_type ) - if t_len: + if t_byte_source: + output_path = yield self._write_to_file( + content, + lambda f: f.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + ) + logger.info("Stored thumbnail in file %r", output_path) + yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, t_len + media_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) ) defer.returnValue(t_path) @@ -306,21 +339,25 @@ class MediaRepository(object): t_width, t_height, t_method, t_type): input_path = self.filepaths.remote_media_filepath(server_name, file_id) - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - - t_len = yield preserve_context_over_fn( + thumbnailer = Thumbnailer(input_path) + t_byte_source = yield preserve_context_over_fn( threads.deferToThread, self._generate_thumbnail, - input_path, t_path, t_width, t_height, t_method, t_type + thumbnailer, t_width, t_height, t_method, t_type ) - if t_len: + if t_byte_source: + output_path = yield self._write_to_file( + content, + lambda f: f.remote_media_thumbnail( + server_name, file_id, t_width, t_height, t_type, t_method + ) + ) + logger.info("Stored thumbnail in file %r", output_path) + yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len + t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) ) defer.returnValue(t_path) @@ -351,59 +388,32 @@ class MediaRepository(object): local_thumbnails = [] def generate_thumbnails(): - scales = set() - crops = set() for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - if url_cache: - t_path = self.filepaths.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) + t_byte_source = self._generate_thumbnail( + thumbnailer, r_width, r_height, r_method, r_type, + ) local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len - )) - - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - if url_cache: - t_path = self.filepaths.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - t_path = self.filepaths.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - local_thumbnails.append(( - media_id, t_width, t_height, t_type, t_method, t_len + r_width, r_height, r_method, r_type, t_byte_source )) yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - for l in local_thumbnails: - yield self.store.store_local_thumbnail(*l) + for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: + if url_cache: + path_name_func = lambda f: f.url_cache_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + else: + path_name_func = lambda f: f.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + + yield self._write_to_file(t_byte_source, path_name_func) + + yield self.store.store_local_thumbnail( + media_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + ) defer.returnValue({ "width": m_width, @@ -433,51 +443,32 @@ class MediaRepository(object): ) return - scales = set() - crops = set() for r_width, r_height, r_method, r_type in requirements: - if r_method == "scale": - t_width, t_height = thumbnailer.aspect(r_width, r_height) - scales.add(( - min(m_width, t_width), min(m_height, t_height), r_type, - )) - elif r_method == "crop": - crops.add((r_width, r_height, r_type)) - - for t_width, t_height, t_type in scales: - t_method = "scale" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method + t_byte_source = self._generate_thumbnail( + thumbnailer, r_width, r_height, r_method, r_type, ) - self._makedirs(t_path) - t_len = thumbnailer.scale(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ]) - for t_width, t_height, t_type in crops: - if (t_width, t_height, t_type) in scales: - # If the aspect ratio of the cropped thumbnail matches a purely - # scaled one then there is no point in calculating a separate - # thumbnail. - continue - t_method = "crop" - t_path = self.filepaths.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) - self._makedirs(t_path) - t_len = thumbnailer.crop(t_path, t_width, t_height, t_type) - remote_thumbnails.append([ - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ]) + remote_thumbnails.append(( + r_width, r_height, r_method, r_type, t_byte_source + )) yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) for r in remote_thumbnails: yield self.store.store_remote_media_thumbnail(*r) + for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: + path_name_func = lambda f: f.remote_media_thumbnail( + server_name, media_id, file_id, t_width, t_height, t_type, t_method + ) + + yield self._write_to_file(t_byte_source, path_name_func) + + yield self.store.store_remote_media_thumbnail( + server_name, media_id, file_id, + t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + ) + defer.returnValue({ "width": m_width, "height": m_height, diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 3868d4f65f..60498b08aa 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -50,12 +50,12 @@ class Thumbnailer(object): else: return ((max_height * self.width) // self.height, max_height) - def scale(self, output_path, width, height, output_type): + def scale(self, width, height, output_type): """Rescales the image to the given dimensions""" scaled = self.image.resize((width, height), Image.ANTIALIAS) - return self.save_image(scaled, output_type, output_path) + return self._encode_image(scaled, output_type) - def crop(self, output_path, width, height, output_type): + def crop(self, width, height, output_type): """Rescales and crops the image to the given dimensions preserving aspect:: (w_in / h_in) = (w_scaled / h_scaled) @@ -82,13 +82,9 @@ class Thumbnailer(object): crop_left = (scaled_width - width) // 2 crop_right = width + crop_left cropped = scaled_image.crop((crop_left, 0, crop_right, height)) - return self.save_image(cropped, output_type, output_path) + return self._encode_image(cropped, output_type) - def save_image(self, output_image, output_type, output_path): + def _encode_image(self, output_image, output_type): output_bytes_io = BytesIO() output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80) - output_bytes = output_bytes_io.getvalue() - with open(output_path, "wb") as output_file: - output_file.write(output_bytes) - logger.info("Stored thumbnail in file %r", output_path) - return len(output_bytes) + return output_bytes_io diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 4ab33f73bf..f6f498cdc5 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -93,7 +93,7 @@ class UploadResource(Resource): # TODO(markjh): parse content-dispostion content_uri = yield self.media_repo.create_content( - media_type, upload_name, request.content.read(), + media_type, upload_name, request.content, content_length, requester.user ) From 67cb89fbdf62dfb2ff65f6f7f0ca23445cdac0ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 15:23:41 +0100 Subject: [PATCH 03/53] Fix typo --- synapse/rest/media/v1/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 3b442cc16b..f26f793bed 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -107,7 +107,7 @@ class MediaRepository(object): # Write to backup repository if self.backup_filepaths: - backup_fname = file_name_func(backup_filepaths) + backup_fname = file_name_func(self.backup_filepaths) self._makedirs(backup_fname) # We can either wait for successful writing to the backup repository From c8eeef6947af762c3eabef6ecca0f69833fbf8ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 15:28:24 +0100 Subject: [PATCH 04/53] Fix typos --- synapse/rest/media/v1/media_repository.py | 46 +++++++++++++---------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index f26f793bed..a16034fd67 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -65,6 +65,8 @@ class MediaRepository(object): if hs.config.backup_media_store_path: self.backup_filepaths = MediaFilePaths(hs.config.backup_media_store_path) + self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store + self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements @@ -112,12 +114,12 @@ class MediaRepository(object): # We can either wait for successful writing to the backup repository # or write in the background and immediately return - if hs.config.synchronous_backup_media_store: + if self.synchronous_backup_media_store: yield preserve_context_over_fn( threads.deferToThread, write_file_thread, backup_fname, ) else: - preserve_fn(threads.deferToThread)(write_file, backup_fname) + preserve_fn(threads.deferToThread)(write_file_thread, backup_fname) defer.returnValue(fname) @@ -321,7 +323,7 @@ class MediaRepository(object): if t_byte_source: output_path = yield self._write_to_file( - content, + t_byte_source, lambda f: f.local_media_thumbnail( media_id, t_width, t_height, t_type, t_method ) @@ -329,10 +331,11 @@ class MediaRepository(object): logger.info("Stored thumbnail in file %r", output_path) yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + media_id, t_width, t_height, t_type, t_method, + len(t_byte_source.getvalue()) ) - defer.returnValue(t_path) + defer.returnValue(output_path) @defer.inlineCallbacks def generate_remote_exact_thumbnail(self, server_name, file_id, media_id, @@ -348,7 +351,7 @@ class MediaRepository(object): if t_byte_source: output_path = yield self._write_to_file( - content, + t_byte_source, lambda f: f.remote_media_thumbnail( server_name, file_id, t_width, t_height, t_type, t_method ) @@ -360,7 +363,7 @@ class MediaRepository(object): t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) ) - defer.returnValue(t_path) + defer.returnValue(output_path) @defer.inlineCallbacks def _generate_local_thumbnails(self, media_id, media_info, url_cache=False): @@ -400,19 +403,21 @@ class MediaRepository(object): yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: - if url_cache: - path_name_func = lambda f: f.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - path_name_func = lambda f: f.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) + def path_name_func(f): + if url_cache: + return f.url_cache_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) + else: + return f.local_media_thumbnail( + media_id, t_width, t_height, t_type, t_method + ) yield self._write_to_file(t_byte_source, path_name_func) yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + media_id, t_width, t_height, t_type, t_method, + len(t_byte_source.getvalue()) ) defer.returnValue({ @@ -457,10 +462,11 @@ class MediaRepository(object): for r in remote_thumbnails: yield self.store.store_remote_media_thumbnail(*r) - for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: - path_name_func = lambda f: f.remote_media_thumbnail( - server_name, media_id, file_id, t_width, t_height, t_type, t_method - ) + for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: + def path_name_func(f): + return f.remote_media_thumbnail( + server_name, media_id, file_id, t_width, t_height, t_type, t_method + ) yield self._write_to_file(t_byte_source, path_name_func) From 6dfde6d4856695890271232f8a2e4c5f32615dd1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 15:30:26 +0100 Subject: [PATCH 05/53] Remove dead code --- synapse/rest/media/v1/media_repository.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index a16034fd67..1eeb128d2a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -459,9 +459,6 @@ class MediaRepository(object): yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) - for r in remote_thumbnails: - yield self.store.store_remote_media_thumbnail(*r) - for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: def path_name_func(f): return f.remote_media_thumbnail( From b77a13812c38b2e79b2ebfddb52ce88a2ac8e9b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 15:32:32 +0100 Subject: [PATCH 06/53] Typo --- synapse/rest/media/v1/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 1eeb128d2a..93b35af9cf 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -462,7 +462,7 @@ class MediaRepository(object): for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: def path_name_func(f): return f.remote_media_thumbnail( - server_name, media_id, file_id, t_width, t_height, t_type, t_method + server_name, file_id, t_width, t_height, t_type, t_method ) yield self._write_to_file(t_byte_source, path_name_func) From e283b555b1f20de4fd393fd947e82eb3c635b7e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 17:31:24 +0100 Subject: [PATCH 07/53] Copy everything to backup --- synapse/config/repository.py | 4 +- synapse/rest/media/v1/filepath.py | 99 ++++++++++------ synapse/rest/media/v1/media_repository.py | 109 +++++++++++------- synapse/rest/media/v1/preview_url_resource.py | 7 +- synapse/rest/media/v1/thumbnailer.py | 9 +- 5 files changed, 151 insertions(+), 77 deletions(-) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index e3c83d56fa..6baa474931 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -75,7 +75,9 @@ class ContentRepositoryConfig(Config): self.backup_media_store_path = config.get("backup_media_store_path") if self.backup_media_store_path: - self.ensure_directory(self.backup_media_store_path) + self.backup_media_store_path = self.ensure_directory( + self.backup_media_store_path + ) self.synchronous_backup_media_store = config.get( "synchronous_backup_media_store", False diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index d5cec10127..43d0eea00d 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -15,103 +15,134 @@ import os import re +import functools NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d") +def _wrap_in_base_path(func): + """Takes a function that returns a relative path and turns it into an + absolute path based on the location of the primary media store + """ + @functools.wraps(func) + def _wrapped(self, *args, **kwargs): + path = func(self, *args, **kwargs) + return os.path.join(self.primary_base_path, path) + + return _wrapped + + class MediaFilePaths(object): + """Describes where files are stored on disk. - def __init__(self, base_path): - self.base_path = base_path + Most of the function have a `*_rel` variant which returns a file path that + is relative to the base media store path. This is mainly used when we want + to write to the backup media store (when one is configured) + """ - def default_thumbnail(self, default_top_level, default_sub_type, width, - height, content_type, method): + def __init__(self, primary_base_path): + self.primary_base_path = primary_base_path + + def default_thumbnail_rel(self, default_top_level, default_sub_type, width, + height, content_type, method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) return os.path.join( - self.base_path, "default_thumbnails", default_top_level, + "default_thumbnails", default_top_level, default_sub_type, file_name ) - def local_media_filepath(self, media_id): + default_thumbnail = _wrap_in_base_path(default_thumbnail_rel) + + def local_media_filepath_rel(self, media_id): return os.path.join( - self.base_path, "local_content", + "local_content", media_id[0:2], media_id[2:4], media_id[4:] ) - def local_media_thumbnail(self, media_id, width, height, content_type, - method): + local_media_filepath = _wrap_in_base_path(local_media_filepath_rel) + + def local_media_thumbnail_rel(self, media_id, width, height, content_type, + method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s-%s" % ( width, height, top_level_type, sub_type, method ) return os.path.join( - self.base_path, "local_thumbnails", + "local_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name ) - def remote_media_filepath(self, server_name, file_id): + local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel) + + def remote_media_filepath_rel(self, server_name, file_id): return os.path.join( - self.base_path, "remote_content", server_name, + "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:] ) - def remote_media_thumbnail(self, server_name, file_id, width, height, - content_type, method): + remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel) + + def remote_media_thumbnail_rel(self, server_name, file_id, width, height, + content_type, method): top_level_type, sub_type = content_type.split("/") file_name = "%i-%i-%s-%s" % (width, height, top_level_type, sub_type) return os.path.join( - self.base_path, "remote_thumbnail", server_name, + "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], file_name ) + remote_media_thumbnail = _wrap_in_base_path(remote_media_thumbnail_rel) + def remote_media_thumbnail_dir(self, server_name, file_id): return os.path.join( - self.base_path, "remote_thumbnail", server_name, + "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], ) - def url_cache_filepath(self, media_id): + def url_cache_filepath_rel(self, media_id): if NEW_FORMAT_ID_RE.match(media_id): # Media id is of the form # E.g.: 2017-09-28-fsdRDt24DS234dsf return os.path.join( - self.base_path, "url_cache", + "url_cache", media_id[:10], media_id[11:] ) else: return os.path.join( - self.base_path, "url_cache", + "url_cache", media_id[0:2], media_id[2:4], media_id[4:], ) + url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel) + def url_cache_filepath_dirs_to_delete(self, media_id): "The dirs to try and remove if we delete the media_id file" if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - self.base_path, "url_cache", + "url_cache", media_id[:10], ), ] else: return [ os.path.join( - self.base_path, "url_cache", + "url_cache", media_id[0:2], media_id[2:4], ), os.path.join( - self.base_path, "url_cache", + "url_cache", media_id[0:2], ), ] - def url_cache_thumbnail(self, media_id, width, height, content_type, - method): + def url_cache_thumbnail_rel(self, media_id, width, height, content_type, + method): # Media id is of the form # E.g.: 2017-09-28-fsdRDt24DS234dsf @@ -122,29 +153,31 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[:10], media_id[11:], file_name ) else: return os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], file_name ) + url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel) + def url_cache_thumbnail_directory(self, media_id): # Media id is of the form # E.g.: 2017-09-28-fsdRDt24DS234dsf if NEW_FORMAT_ID_RE.match(media_id): return os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[:10], media_id[11:], ) else: return os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ) @@ -155,26 +188,26 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[:10], media_id[11:], ), os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[:10], ), ] else: return [ os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ), os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[0:2], media_id[2:4], ), os.path.join( - self.base_path, "url_cache_thumbnails", + "url_cache_thumbnails", media_id[0:2], ), ] diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 93b35af9cf..398e973ca9 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -60,10 +60,12 @@ class MediaRepository(object): self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels - self.filepaths = MediaFilePaths(hs.config.media_store_path) - self.backup_filepaths = None + self.primary_base_path = hs.config.media_store_path + self.filepaths = MediaFilePaths(self.primary_base_path) + + self.backup_base_path = None if hs.config.backup_media_store_path: - self.backup_filepaths = MediaFilePaths(hs.config.backup_media_store_path) + self.backup_base_path = hs.config.backup_media_store_path self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store @@ -94,42 +96,63 @@ class MediaRepository(object): if not os.path.exists(dirname): os.makedirs(dirname) - @defer.inlineCallbacks - def _write_to_file(self, source, file_name_func): - def write_file_thread(file_name): - source.seek(0) # Ensure we read from the start of the file - with open(file_name, "wb") as f: - shutil.copyfileobj(source, f) + @staticmethod + def write_file_synchronously(source, fname): + source.seek(0) # Ensure we read from the start of the file + with open(fname, "wb") as f: + shutil.copyfileobj(source, f) - fname = file_name_func(self.filepaths) + @defer.inlineCallbacks + def write_to_file(self, source, path): + """Write `source` to the on disk media store, and also the backup store + if configured. + + Args: + source: A file like object that should be written + path: Relative path to write file to + + Returns: + string: the file path written to in the primary media store + """ + fname = os.path.join(self.primary_base_path, path) self._makedirs(fname) # Write to the main repository - yield preserve_context_over_fn(threads.deferToThread, write_file_thread, fname) + yield preserve_context_over_fn( + threads.deferToThread, + self.write_file_synchronously, source, fname, + ) # Write to backup repository - if self.backup_filepaths: - backup_fname = file_name_func(self.backup_filepaths) + yield self.copy_to_backup(source, path) + + defer.returnValue(fname) + + @defer.inlineCallbacks + def copy_to_backup(self, source, path): + if self.backup_base_path: + backup_fname = os.path.join(self.backup_base_path, path) self._makedirs(backup_fname) # We can either wait for successful writing to the backup repository # or write in the background and immediately return if self.synchronous_backup_media_store: yield preserve_context_over_fn( - threads.deferToThread, write_file_thread, backup_fname, + threads.deferToThread, + self.write_file_synchronously, source, backup_fname, ) else: - preserve_fn(threads.deferToThread)(write_file_thread, backup_fname) - - defer.returnValue(fname) + preserve_fn(threads.deferToThread)( + self.write_file_synchronously, source, backup_fname, + ) @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, auth_user): media_id = random_string(24) - fname = yield self._write_to_file( - content, lambda f: f.local_media_filepath(media_id) + fname = yield self.write_to_file( + content, self.filepaths.local_media_filepath_rel(media_id) ) logger.info("Stored local media in file %r", fname) @@ -180,9 +203,10 @@ class MediaRepository(object): def _download_remote_file(self, server_name, media_id): file_id = random_string(24) - fname = self.filepaths.remote_media_filepath( + fpath = self.filepaths.remote_media_filepath_rel( server_name, file_id ) + fname = os.path.join(self.primary_base_path, fpath) self._makedirs(fname) try: @@ -224,6 +248,9 @@ class MediaRepository(object): server_name, media_id) raise SynapseError(502, "Failed to fetch remote media") + with open(fname) as f: + yield self.copy_to_backup(f, fpath) + media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() @@ -322,15 +349,15 @@ class MediaRepository(object): ) if t_byte_source: - output_path = yield self._write_to_file( + output_path = yield self.write_to_file( t_byte_source, - lambda f: f.local_media_thumbnail( + self.filepaths.local_media_thumbnail_rel( media_id, t_width, t_height, t_type, t_method ) ) logger.info("Stored thumbnail in file %r", output_path) - yield self.store.store_local_thumbnail( + yield self.store.store_local_thumbnail_rel( media_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) ) @@ -350,15 +377,15 @@ class MediaRepository(object): ) if t_byte_source: - output_path = yield self._write_to_file( + output_path = yield self.write_to_file( t_byte_source, - lambda f: f.remote_media_thumbnail( + self.filepaths.remote_media_thumbnail_rel( server_name, file_id, t_width, t_height, t_type, t_method ) ) logger.info("Stored thumbnail in file %r", output_path) - yield self.store.store_remote_media_thumbnail( + yield self.store.store_remote_media_thumbnail_rel( server_name, media_id, file_id, t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) ) @@ -403,17 +430,16 @@ class MediaRepository(object): yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: - def path_name_func(f): - if url_cache: - return f.url_cache_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) - else: - return f.local_media_thumbnail( - media_id, t_width, t_height, t_type, t_method - ) + if url_cache: + file_path = self.filepaths.url_cache_thumbnail_rel( + media_id, t_width, t_height, t_type, t_method + ) + else: + file_path = self.filepaths.local_media_thumbnail_rel( + media_id, t_width, t_height, t_type, t_method + ) - yield self._write_to_file(t_byte_source, path_name_func) + yield self.write_to_file(t_byte_source, file_path) yield self.store.store_local_thumbnail( media_id, t_width, t_height, t_type, t_method, @@ -460,12 +486,11 @@ class MediaRepository(object): yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: - def path_name_func(f): - return f.remote_media_thumbnail( - server_name, file_id, t_width, t_height, t_type, t_method - ) + file_path = self.filepaths.remote_media_thumbnail_rel( + server_name, file_id, t_width, t_height, t_type, t_method + ) - yield self._write_to_file(t_byte_source, path_name_func) + yield self.write_to_file(t_byte_source, file_path) yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, @@ -491,6 +516,8 @@ class MediaRepository(object): logger.info("Deleting: %r", key) + # TODO: Should we delete from the backup store + with (yield self.remote_media_linearizer.queue(key)): full_path = self.filepaths.remote_media_filepath(origin, file_id) try: diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 895b480d5c..f82b8fbc51 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -59,6 +59,7 @@ class PreviewUrlResource(Resource): self.store = hs.get_datastore() self.client = SpiderHttpClient(hs) self.media_repo = media_repo + self.primary_base_path = media_repo.primary_base_path self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist @@ -262,7 +263,8 @@ class PreviewUrlResource(Resource): file_id = datetime.date.today().isoformat() + '_' + random_string(16) - fname = self.filepaths.url_cache_filepath(file_id) + fpath = self.filepaths.url_cache_filepath_rel(file_id) + fname = os.path.join(self.primary_base_path, fpath) self.media_repo._makedirs(fname) try: @@ -273,6 +275,9 @@ class PreviewUrlResource(Resource): ) # FIXME: pass through 404s and other error messages nicely + with open(fname) as f: + yield self.media_repo.copy_to_backup(f, fpath) + media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 60498b08aa..e1ee535b9a 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -51,7 +51,11 @@ class Thumbnailer(object): return ((max_height * self.width) // self.height, max_height) def scale(self, width, height, output_type): - """Rescales the image to the given dimensions""" + """Rescales the image to the given dimensions. + + Returns: + BytesIO: the bytes of the encoded image ready to be written to disk + """ scaled = self.image.resize((width, height), Image.ANTIALIAS) return self._encode_image(scaled, output_type) @@ -65,6 +69,9 @@ class Thumbnailer(object): Args: max_width: The largest possible width. max_height: The larget possible height. + + Returns: + BytesIO: the bytes of the encoded image ready to be written to disk """ if width * self.height > height * self.width: scaled_height = (width * self.height) // self.width From 802ca12d0551ff761e01d9af8348df1dc96fc372 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 17:37:21 +0100 Subject: [PATCH 08/53] Don't close file prematurely --- synapse/rest/media/v1/media_repository.py | 22 ++++++++++++++----- synapse/rest/media/v1/preview_url_resource.py | 4 ++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 398e973ca9..63ed1c4268 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -97,16 +97,20 @@ class MediaRepository(object): os.makedirs(dirname) @staticmethod - def write_file_synchronously(source, fname): + def _write_file_synchronously(source, fname): source.seek(0) # Ensure we read from the start of the file with open(fname, "wb") as f: shutil.copyfileobj(source, f) + source.close() + @defer.inlineCallbacks def write_to_file(self, source, path): """Write `source` to the on disk media store, and also the backup store if configured. + Will close source once finished. + Args: source: A file like object that should be written path: Relative path to write file to @@ -120,7 +124,7 @@ class MediaRepository(object): # Write to the main repository yield preserve_context_over_fn( threads.deferToThread, - self.write_file_synchronously, source, fname, + self._write_file_synchronously, source, fname, ) # Write to backup repository @@ -130,6 +134,10 @@ class MediaRepository(object): @defer.inlineCallbacks def copy_to_backup(self, source, path): + """Copy file like object source to the backup media store, if configured. + + Will close source after its done. + """ if self.backup_base_path: backup_fname = os.path.join(self.backup_base_path, path) self._makedirs(backup_fname) @@ -139,12 +147,14 @@ class MediaRepository(object): if self.synchronous_backup_media_store: yield preserve_context_over_fn( threads.deferToThread, - self.write_file_synchronously, source, backup_fname, + self._write_file_synchronously, source, backup_fname, ) else: preserve_fn(threads.deferToThread)( - self.write_file_synchronously, source, backup_fname, + self._write_file_synchronously, source, backup_fname, ) + else: + source.close() @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, @@ -248,8 +258,8 @@ class MediaRepository(object): server_name, media_id) raise SynapseError(502, "Failed to fetch remote media") - with open(fname) as f: - yield self.copy_to_backup(f, fpath) + # Will close the file after its done + yield self.copy_to_backup(open(fname), fpath) media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index f82b8fbc51..a3288c9cc6 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -275,8 +275,8 @@ class PreviewUrlResource(Resource): ) # FIXME: pass through 404s and other error messages nicely - with open(fname) as f: - yield self.media_repo.copy_to_backup(f, fpath) + # Will close the file after its done + yield self.media_repo.copy_to_backup(open(fname), fpath) media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() From 1259a76047a0a718ce0c9fb26513c9127f8ea919 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 17:39:23 +0100 Subject: [PATCH 09/53] Get len before close --- synapse/rest/media/v1/media_repository.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 63ed1c4268..d25b98db45 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -359,6 +359,8 @@ class MediaRepository(object): ) if t_byte_source: + t_len = len(t_byte_source.getvalue()) + output_path = yield self.write_to_file( t_byte_source, self.filepaths.local_media_thumbnail_rel( @@ -368,8 +370,7 @@ class MediaRepository(object): logger.info("Stored thumbnail in file %r", output_path) yield self.store.store_local_thumbnail_rel( - media_id, t_width, t_height, t_type, t_method, - len(t_byte_source.getvalue()) + media_id, t_width, t_height, t_type, t_method, t_len ) defer.returnValue(output_path) @@ -387,6 +388,7 @@ class MediaRepository(object): ) if t_byte_source: + t_len = len(t_byte_source.getvalue()) output_path = yield self.write_to_file( t_byte_source, self.filepaths.remote_media_thumbnail_rel( @@ -397,7 +399,7 @@ class MediaRepository(object): yield self.store.store_remote_media_thumbnail_rel( server_name, media_id, file_id, - t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + t_width, t_height, t_type, t_method, t_len ) defer.returnValue(output_path) @@ -449,11 +451,12 @@ class MediaRepository(object): media_id, t_width, t_height, t_type, t_method ) + t_len = len(t_byte_source.getvalue()) + yield self.write_to_file(t_byte_source, file_path) yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, - len(t_byte_source.getvalue()) + media_id, t_width, t_height, t_type, t_method, t_len ) defer.returnValue({ @@ -500,11 +503,13 @@ class MediaRepository(object): server_name, file_id, t_width, t_height, t_type, t_method ) + t_len = len(t_byte_source.getvalue()) + yield self.write_to_file(t_byte_source, file_path) yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, - t_width, t_height, t_type, t_method, len(t_byte_source.getvalue()) + t_width, t_height, t_type, t_method, t_len ) defer.returnValue({ From cc505b4b5e98ba70d8576a562fc36b03d6aa5150 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 17:52:30 +0100 Subject: [PATCH 10/53] getvalue closes buffer --- synapse/rest/media/v1/media_repository.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index d25b98db45..ff2ddd2f18 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -258,7 +258,7 @@ class MediaRepository(object): server_name, media_id) raise SynapseError(502, "Failed to fetch remote media") - # Will close the file after its done + # Will close the file after its done yield self.copy_to_backup(open(fname), fpath) media_type = headers["Content-Type"][0] @@ -359,8 +359,6 @@ class MediaRepository(object): ) if t_byte_source: - t_len = len(t_byte_source.getvalue()) - output_path = yield self.write_to_file( t_byte_source, self.filepaths.local_media_thumbnail_rel( @@ -369,6 +367,8 @@ class MediaRepository(object): ) logger.info("Stored thumbnail in file %r", output_path) + t_len = os.path.getsize(output_path) + yield self.store.store_local_thumbnail_rel( media_id, t_width, t_height, t_type, t_method, t_len ) @@ -388,7 +388,6 @@ class MediaRepository(object): ) if t_byte_source: - t_len = len(t_byte_source.getvalue()) output_path = yield self.write_to_file( t_byte_source, self.filepaths.remote_media_thumbnail_rel( @@ -397,7 +396,9 @@ class MediaRepository(object): ) logger.info("Stored thumbnail in file %r", output_path) - yield self.store.store_remote_media_thumbnail_rel( + t_len = os.path.getsize(output_path) + + yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, t_width, t_height, t_type, t_method, t_len ) @@ -451,9 +452,8 @@ class MediaRepository(object): media_id, t_width, t_height, t_type, t_method ) - t_len = len(t_byte_source.getvalue()) - - yield self.write_to_file(t_byte_source, file_path) + output_path = yield self.write_to_file(t_byte_source, file_path) + t_len = os.path.getsize(output_path) yield self.store.store_local_thumbnail( media_id, t_width, t_height, t_type, t_method, t_len @@ -503,9 +503,8 @@ class MediaRepository(object): server_name, file_id, t_width, t_height, t_type, t_method ) - t_len = len(t_byte_source.getvalue()) - - yield self.write_to_file(t_byte_source, file_path) + output_path = yield self.write_to_file(t_byte_source, file_path) + t_len = os.path.getsize(output_path) yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, From 4ae85ae12190015595f979f1a302ee608de6fd65 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 17:57:31 +0100 Subject: [PATCH 11/53] Don't close prematurely.. --- synapse/rest/media/v1/media_repository.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index ff2ddd2f18..80b14a6739 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -97,12 +97,13 @@ class MediaRepository(object): os.makedirs(dirname) @staticmethod - def _write_file_synchronously(source, fname): + def _write_file_synchronously(source, fname, close_source=False): source.seek(0) # Ensure we read from the start of the file with open(fname, "wb") as f: shutil.copyfileobj(source, f) - source.close() + if close_source: + source.close() @defer.inlineCallbacks def write_to_file(self, source, path): @@ -148,10 +149,12 @@ class MediaRepository(object): yield preserve_context_over_fn( threads.deferToThread, self._write_file_synchronously, source, backup_fname, + close_source=True, ) else: preserve_fn(threads.deferToThread)( self._write_file_synchronously, source, backup_fname, + close_source=True, ) else: source.close() From d76621a47b7b4b778055760d43df9d02614dac19 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Oct 2017 18:16:25 +0100 Subject: [PATCH 12/53] Fix comments --- synapse/rest/media/v1/filepath.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 43d0eea00d..6923a3fbd3 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -35,7 +35,7 @@ def _wrap_in_base_path(func): class MediaFilePaths(object): """Describes where files are stored on disk. - Most of the function have a `*_rel` variant which returns a file path that + Most of the functions have a `*_rel` variant which returns a file path that is relative to the base media store path. This is mainly used when we want to write to the backup media store (when one is configured) """ diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index a3288c9cc6..e986e855a7 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -343,6 +343,9 @@ class PreviewUrlResource(Resource): def _expire_url_cache_data(self): """Clean up expired url cache content, media and thumbnails. """ + + # TODO: Delete from backup media store + now = self.clock.time_msec() # First we delete expired url cache entries From b60859d6cc429ea1934b94a8749caadd9a96ee21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 10:24:19 +0100 Subject: [PATCH 13/53] Use make_deferred_yieldable --- synapse/rest/media/v1/media_repository.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 80b14a6739..5c5020fe9d 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -33,7 +33,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \ from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii -from synapse.util.logcontext import preserve_context_over_fn, preserve_fn +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.retryutils import NotRetryingDestination import os @@ -123,7 +123,7 @@ class MediaRepository(object): self._makedirs(fname) # Write to the main repository - yield preserve_context_over_fn( + yield make_deferred_yieldable( threads.deferToThread, self._write_file_synchronously, source, fname, ) @@ -146,7 +146,7 @@ class MediaRepository(object): # We can either wait for successful writing to the backup repository # or write in the background and immediately return if self.synchronous_backup_media_store: - yield preserve_context_over_fn( + yield make_deferred_yieldable( threads.deferToThread, self._write_file_synchronously, source, backup_fname, close_source=True, @@ -355,7 +355,7 @@ class MediaRepository(object): input_path = self.filepaths.local_media_filepath(media_id) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield preserve_context_over_fn( + t_byte_source = yield make_deferred_yieldable( threads.deferToThread, self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type @@ -384,7 +384,7 @@ class MediaRepository(object): input_path = self.filepaths.remote_media_filepath(server_name, file_id) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield preserve_context_over_fn( + t_byte_source = yield make_deferred_yieldable( threads.deferToThread, self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type @@ -443,7 +443,7 @@ class MediaRepository(object): r_width, r_height, r_method, r_type, t_byte_source )) - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) + yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails) for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: if url_cache: @@ -499,7 +499,7 @@ class MediaRepository(object): r_width, r_height, r_method, r_type, t_byte_source )) - yield preserve_context_over_fn(threads.deferToThread, generate_thumbnails) + yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails) for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: file_path = self.filepaths.remote_media_thumbnail_rel( From 64db043a71238db3f65f575c40f29260b83145be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 10:25:01 +0100 Subject: [PATCH 14/53] Move makedirs to thread --- synapse/rest/media/v1/media_repository.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 5c5020fe9d..72aad221a8 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -98,6 +98,7 @@ class MediaRepository(object): @staticmethod def _write_file_synchronously(source, fname, close_source=False): + MediaRepository._makedirs(fname) source.seek(0) # Ensure we read from the start of the file with open(fname, "wb") as f: shutil.copyfileobj(source, f) @@ -120,7 +121,6 @@ class MediaRepository(object): string: the file path written to in the primary media store """ fname = os.path.join(self.primary_base_path, path) - self._makedirs(fname) # Write to the main repository yield make_deferred_yieldable( @@ -141,7 +141,6 @@ class MediaRepository(object): """ if self.backup_base_path: backup_fname = os.path.join(self.backup_base_path, path) - self._makedirs(backup_fname) # We can either wait for successful writing to the backup repository # or write in the background and immediately return From 35332298ef6a9828aa1fdb10f59230f47763084e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 10:39:32 +0100 Subject: [PATCH 15/53] Fix up comments --- synapse/rest/media/v1/media_repository.py | 28 +++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 72aad221a8..f3a5b19a80 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -98,6 +98,14 @@ class MediaRepository(object): @staticmethod def _write_file_synchronously(source, fname, close_source=False): + """Write `source` to the path `fname` synchronously. Should be called + from a thread. + + Args: + source: A file like object to be written + fname (str): Path to write to + close_source (bool): Whether to close source after writing + """ MediaRepository._makedirs(fname) source.seek(0) # Ensure we read from the start of the file with open(fname, "wb") as f: @@ -115,10 +123,10 @@ class MediaRepository(object): Args: source: A file like object that should be written - path: Relative path to write file to + path(str): Relative path to write file to Returns: - string: the file path written to in the primary media store + Deferred[str]: the file path written to in the primary media store """ fname = os.path.join(self.primary_base_path, path) @@ -138,6 +146,10 @@ class MediaRepository(object): """Copy file like object source to the backup media store, if configured. Will close source after its done. + + Args: + source: A file like object that should be written + path(str): Relative path to write file to """ if self.backup_base_path: backup_fname = os.path.join(self.backup_base_path, path) @@ -161,6 +173,18 @@ class MediaRepository(object): @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, auth_user): + """Store uploaded content for a local user and return the mxc URL + + Args: + media_type(str): The content type of the file + upload_name(str): The name of the file + content: A file like object that is the content to store + content_length(int): The length of the content + auth_user(str): The user_id of the uploader + + Returns: + Deferred[str]: The mxc url of the stored content + """ media_id = random_string(24) fname = yield self.write_to_file( From e3428d26ca5a23a3dac6d106aff8ac19f9839f32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 10:39:59 +0100 Subject: [PATCH 16/53] Fix typo --- synapse/rest/media/v1/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index f3a5b19a80..76220a5531 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -395,7 +395,7 @@ class MediaRepository(object): t_len = os.path.getsize(output_path) - yield self.store.store_local_thumbnail_rel( + yield self.store.store_local_thumbnail( media_id, t_width, t_height, t_type, t_method, t_len ) From 505371414f6ba9aeaa95eb8d34f7893c4cc2b07e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:23:53 +0100 Subject: [PATCH 17/53] Fix up thumbnailing function --- synapse/rest/media/v1/media_repository.py | 127 ++++++++---------- synapse/rest/media/v1/preview_url_resource.py | 8 +- synapse/rest/media/v1/thumbnailer.py | 13 +- 3 files changed, 73 insertions(+), 75 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 76220a5531..36f42c73be 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -206,7 +206,7 @@ class MediaRepository(object): "media_length": content_length, } - yield self._generate_local_thumbnails(media_id, media_info) + yield self._generate_thumbnails(None, media_id, media_info) defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) @@ -339,7 +339,7 @@ class MediaRepository(object): "filesystem_id": file_id, } - yield self._generate_remote_thumbnails( + yield self._generate_thumbnails( server_name, media_id, media_info ) @@ -385,6 +385,8 @@ class MediaRepository(object): ) if t_byte_source: + t_width, t_height = t_byte_source.dimensions + output_path = yield self.write_to_file( t_byte_source, self.filepaths.local_media_thumbnail_rel( @@ -414,6 +416,8 @@ class MediaRepository(object): ) if t_byte_source: + t_width, t_height = t_byte_source.dimensions + output_path = yield self.write_to_file( t_byte_source, self.filepaths.remote_media_thumbnail_rel( @@ -432,13 +436,28 @@ class MediaRepository(object): defer.returnValue(output_path) @defer.inlineCallbacks - def _generate_local_thumbnails(self, media_id, media_info, url_cache=False): + def _generate_thumbnails(self, server_name, media_id, media_info, url_cache=False): + """Generate and store thumbnails for an image. + + Args: + server_name(str|None): The server name if remote media, else None if local + media_id(str) + media_info(dict) + url_cache(bool): If we are thumbnailing images downloaded for the URL cache, + used exclusively by the url previewer + + Returns: + Deferred[dict]: Dict with "width" and "height" keys of original image + """ media_type = media_info["media_type"] + file_id = media_info.get("filesystem_id") requirements = self._get_thumbnail_requirements(media_type) if not requirements: return - if url_cache: + if server_name: + input_path = self.filepaths.remote_media_filepath(server_name, file_id) + elif url_cache: input_path = self.filepaths.url_cache_filepath(media_id) else: input_path = self.filepaths.local_media_filepath(media_id) @@ -454,22 +473,40 @@ class MediaRepository(object): ) return - local_thumbnails = [] + # We deduplicate the thumbnail sizes by ignoring the cropped versions if + # they have the same dimensions of a scaled one. + thumbnails = {} + for r_width, r_height, r_method, r_type in requirements: + if r_method == "crop": + thumbnails.setdefault[(r_width, r_height)] = (r_method, r_type) + elif r_method == "scale": + t_width, t_height = thumbnailer.aspect(t_width, t_height) + t_width = min(m_width, t_width) + t_height = min(m_height, t_height) + thumbnails[(t_width, t_height)] = (r_method, r_type) - def generate_thumbnails(): - for r_width, r_height, r_method, r_type in requirements: - t_byte_source = self._generate_thumbnail( - thumbnailer, r_width, r_height, r_method, r_type, + # Now we generate the thumbnails for each dimension, store it + for (r_width, r_height), (r_method, r_type) in thumbnails.iteritems(): + t_byte_source = thumbnailer.crop(t_width, t_height, t_type) + + if r_type == "crop": + t_byte_source = yield make_deferred_yieldable( + threads.deferToThread, thumbnailer.crop, + r_width, r_height, r_type, + ) + else: + t_byte_source = yield make_deferred_yieldable( + threads.deferToThread, thumbnailer.scale, + r_width, r_height, r_type, ) - local_thumbnails.append(( - r_width, r_height, r_method, r_type, t_byte_source - )) + t_width, t_height = t_byte_source.dimensions - yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails) - - for t_width, t_height, t_method, t_type, t_byte_source in local_thumbnails: - if url_cache: + if server_name: + file_path = self.filepaths.remote_media_thumbnail_rel( + server_name, file_id, t_width, t_height, t_type, t_method + ) + elif url_cache: file_path = self.filepaths.url_cache_thumbnail_rel( media_id, t_width, t_height, t_type, t_method ) @@ -481,61 +518,15 @@ class MediaRepository(object): output_path = yield self.write_to_file(t_byte_source, file_path) t_len = os.path.getsize(output_path) - yield self.store.store_local_thumbnail( - media_id, t_width, t_height, t_type, t_method, t_len - ) - - defer.returnValue({ - "width": m_width, - "height": m_height, - }) - - @defer.inlineCallbacks - def _generate_remote_thumbnails(self, server_name, media_id, media_info): - media_type = media_info["media_type"] - file_id = media_info["filesystem_id"] - requirements = self._get_thumbnail_requirements(media_type) - if not requirements: - return - - remote_thumbnails = [] - - input_path = self.filepaths.remote_media_filepath(server_name, file_id) - thumbnailer = Thumbnailer(input_path) - m_width = thumbnailer.width - m_height = thumbnailer.height - - def generate_thumbnails(): - if m_width * m_height >= self.max_image_pixels: - logger.info( - "Image too large to thumbnail %r x %r > %r", - m_width, m_height, self.max_image_pixels - ) - return - - for r_width, r_height, r_method, r_type in requirements: - t_byte_source = self._generate_thumbnail( - thumbnailer, r_width, r_height, r_method, r_type, - ) - - remote_thumbnails.append(( - r_width, r_height, r_method, r_type, t_byte_source - )) - - yield make_deferred_yieldable(threads.deferToThread, generate_thumbnails) - - for t_width, t_height, t_method, t_type, t_byte_source in remote_thumbnails: - file_path = self.filepaths.remote_media_thumbnail_rel( - server_name, file_id, t_width, t_height, t_type, t_method - ) - - output_path = yield self.write_to_file(t_byte_source, file_path) - t_len = os.path.getsize(output_path) - - yield self.store.store_remote_media_thumbnail( + if server_name: + yield self.store.store_remote_media_thumbnail( server_name, media_id, file_id, t_width, t_height, t_type, t_method, t_len ) + else: + yield self.store.store_local_thumbnail( + media_id, t_width, t_height, t_type, t_method, t_len + ) defer.returnValue({ "width": m_width, diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index e986e855a7..c734f6b7cd 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -171,8 +171,8 @@ class PreviewUrlResource(Resource): logger.debug("got media_info of '%s'" % media_info) if _is_media(media_info['media_type']): - dims = yield self.media_repo._generate_local_thumbnails( - media_info['filesystem_id'], media_info, url_cache=True, + dims = yield self.media_repo._generate_thumbnails( + None, media_info['filesystem_id'], media_info, url_cache=True, ) og = { @@ -217,8 +217,8 @@ class PreviewUrlResource(Resource): if _is_media(image_info['media_type']): # TODO: make sure we don't choke on white-on-transparent images - dims = yield self.media_repo._generate_local_thumbnails( - image_info['filesystem_id'], image_info, url_cache=True, + dims = yield self.media_repo._generate_thumbnails( + None, image_info['filesystem_id'], image_info, url_cache=True, ) if dims: og["og:image:width"] = dims['width'] diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index e1ee535b9a..09650bd527 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -54,7 +54,7 @@ class Thumbnailer(object): """Rescales the image to the given dimensions. Returns: - BytesIO: the bytes of the encoded image ready to be written to disk + ImageIO: the bytes of the encoded image ready to be written to disk """ scaled = self.image.resize((width, height), Image.ANTIALIAS) return self._encode_image(scaled, output_type) @@ -71,7 +71,7 @@ class Thumbnailer(object): max_height: The larget possible height. Returns: - BytesIO: the bytes of the encoded image ready to be written to disk + ImageIO: the bytes of the encoded image ready to be written to disk """ if width * self.height > height * self.width: scaled_height = (width * self.height) // self.width @@ -92,6 +92,13 @@ class Thumbnailer(object): return self._encode_image(cropped, output_type) def _encode_image(self, output_image, output_type): - output_bytes_io = BytesIO() + output_bytes_io = ImageIO(output_image.size) output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80) + output_image.close() return output_bytes_io + + +class ImageIO(BytesIO): + def __init__(self, dimensions): + super(ImageIO, self).__init__() + self.dimensions = dimensions From 0e28281a021101ac199cbf2d0d130190110921bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:33:49 +0100 Subject: [PATCH 18/53] Fix up --- synapse/rest/media/v1/media_repository.py | 62 +++++++++++------------ synapse/rest/media/v1/thumbnailer.py | 13 ++--- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 36f42c73be..a310d08f5f 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -131,10 +131,9 @@ class MediaRepository(object): fname = os.path.join(self.primary_base_path, path) # Write to the main repository - yield make_deferred_yieldable( - threads.deferToThread, + yield make_deferred_yieldable(threads.deferToThread( self._write_file_synchronously, source, fname, - ) + )) # Write to backup repository yield self.copy_to_backup(source, path) @@ -157,11 +156,10 @@ class MediaRepository(object): # We can either wait for successful writing to the backup repository # or write in the background and immediately return if self.synchronous_backup_media_store: - yield make_deferred_yieldable( - threads.deferToThread, + yield make_deferred_yieldable(threads.deferToThread( self._write_file_synchronously, source, backup_fname, close_source=True, - ) + )) else: preserve_fn(threads.deferToThread)( self._write_file_synchronously, source, backup_fname, @@ -378,11 +376,10 @@ class MediaRepository(object): input_path = self.filepaths.local_media_filepath(media_id) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable( - threads.deferToThread, + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - ) + )) if t_byte_source: t_width, t_height = t_byte_source.dimensions @@ -409,11 +406,10 @@ class MediaRepository(object): input_path = self.filepaths.remote_media_filepath(server_name, file_id) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable( - threads.deferToThread, + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - ) + )) if t_byte_source: t_width, t_height = t_byte_source.dimensions @@ -478,34 +474,32 @@ class MediaRepository(object): thumbnails = {} for r_width, r_height, r_method, r_type in requirements: if r_method == "crop": - thumbnails.setdefault[(r_width, r_height)] = (r_method, r_type) + thumbnails.setdefault((r_width, r_height), (r_method, r_type)) elif r_method == "scale": - t_width, t_height = thumbnailer.aspect(t_width, t_height) + t_width, t_height = thumbnailer.aspect(r_width, r_height) t_width = min(m_width, t_width) t_height = min(m_height, t_height) thumbnails[(t_width, t_height)] = (r_method, r_type) # Now we generate the thumbnails for each dimension, store it - for (r_width, r_height), (r_method, r_type) in thumbnails.iteritems(): - t_byte_source = thumbnailer.crop(t_width, t_height, t_type) - - if r_type == "crop": - t_byte_source = yield make_deferred_yieldable( - threads.deferToThread, thumbnailer.crop, - r_width, r_height, r_type, - ) + for (t_width, t_height), (t_method, t_type) in thumbnails.iteritems(): + # Generate the thumbnail + if t_type == "crop": + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.crop, + r_width, r_height, t_type, + )) else: - t_byte_source = yield make_deferred_yieldable( - threads.deferToThread, thumbnailer.scale, - r_width, r_height, r_type, - ) - - t_width, t_height = t_byte_source.dimensions + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.scale, + r_width, r_height, t_type, + )) + # Work out the correct file name for thumbnail if server_name: file_path = self.filepaths.remote_media_thumbnail_rel( - server_name, file_id, t_width, t_height, t_type, t_method - ) + server_name, file_id, t_width, t_height, t_type, t_method + ) elif url_cache: file_path = self.filepaths.url_cache_thumbnail_rel( media_id, t_width, t_height, t_type, t_method @@ -515,14 +509,16 @@ class MediaRepository(object): media_id, t_width, t_height, t_type, t_method ) + # Write to disk output_path = yield self.write_to_file(t_byte_source, file_path) t_len = os.path.getsize(output_path) + # Write to database if server_name: yield self.store.store_remote_media_thumbnail( - server_name, media_id, file_id, - t_width, t_height, t_type, t_method, t_len - ) + server_name, media_id, file_id, + t_width, t_height, t_type, t_method, t_len + ) else: yield self.store.store_local_thumbnail( media_id, t_width, t_height, t_type, t_method, t_len diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index 09650bd527..e1ee535b9a 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -54,7 +54,7 @@ class Thumbnailer(object): """Rescales the image to the given dimensions. Returns: - ImageIO: the bytes of the encoded image ready to be written to disk + BytesIO: the bytes of the encoded image ready to be written to disk """ scaled = self.image.resize((width, height), Image.ANTIALIAS) return self._encode_image(scaled, output_type) @@ -71,7 +71,7 @@ class Thumbnailer(object): max_height: The larget possible height. Returns: - ImageIO: the bytes of the encoded image ready to be written to disk + BytesIO: the bytes of the encoded image ready to be written to disk """ if width * self.height > height * self.width: scaled_height = (width * self.height) // self.width @@ -92,13 +92,6 @@ class Thumbnailer(object): return self._encode_image(cropped, output_type) def _encode_image(self, output_image, output_type): - output_bytes_io = ImageIO(output_image.size) + output_bytes_io = BytesIO() output_image.save(output_bytes_io, self.FORMATS[output_type], quality=80) - output_image.close() return output_bytes_io - - -class ImageIO(BytesIO): - def __init__(self, dimensions): - super(ImageIO, self).__init__() - self.dimensions = dimensions From 9732ec6797339dd33bc472cef5081a858ddccb30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:34:41 +0100 Subject: [PATCH 19/53] s/write_to_file/write_to_file_and_backup/ --- synapse/rest/media/v1/media_repository.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index a310d08f5f..c9753ebb52 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -115,7 +115,7 @@ class MediaRepository(object): source.close() @defer.inlineCallbacks - def write_to_file(self, source, path): + def write_to_file_and_backup(self, source, path): """Write `source` to the on disk media store, and also the backup store if configured. @@ -185,7 +185,7 @@ class MediaRepository(object): """ media_id = random_string(24) - fname = yield self.write_to_file( + fname = yield self.write_to_file_and_backup( content, self.filepaths.local_media_filepath_rel(media_id) ) @@ -384,7 +384,7 @@ class MediaRepository(object): if t_byte_source: t_width, t_height = t_byte_source.dimensions - output_path = yield self.write_to_file( + output_path = yield self.write_to_file_and_backup( t_byte_source, self.filepaths.local_media_thumbnail_rel( media_id, t_width, t_height, t_type, t_method @@ -414,7 +414,7 @@ class MediaRepository(object): if t_byte_source: t_width, t_height = t_byte_source.dimensions - output_path = yield self.write_to_file( + output_path = yield self.write_to_file_and_backup( t_byte_source, self.filepaths.remote_media_thumbnail_rel( server_name, file_id, t_width, t_height, t_type, t_method @@ -510,7 +510,7 @@ class MediaRepository(object): ) # Write to disk - output_path = yield self.write_to_file(t_byte_source, file_path) + output_path = yield self.write_to_file_and_backup(t_byte_source, file_path) t_len = os.path.getsize(output_path) # Write to database From ae5d18617afd98bb5d51b43c2a12a99e9d96da39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:35:44 +0100 Subject: [PATCH 20/53] Make things be absolute paths again --- synapse/rest/media/v1/filepath.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 6923a3fbd3..a3a15ac302 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -172,12 +172,12 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[:10], media_id[11:], ) else: return os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ) @@ -188,26 +188,26 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[:10], media_id[11:], ), os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[:10], ), ] else: return [ os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ), os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], ), os.path.join( - "url_cache_thumbnails", + self.primary_base_path, "url_cache_thumbnails", media_id[0:2], ), ] From 4d7e1dde70e6f2300ab83fb3208152f3d73bde71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:36:32 +0100 Subject: [PATCH 21/53] Remove unnecessary diff --- synapse/rest/media/v1/media_repository.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index c9753ebb52..f06813c48c 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -63,9 +63,7 @@ class MediaRepository(object): self.primary_base_path = hs.config.media_store_path self.filepaths = MediaFilePaths(self.primary_base_path) - self.backup_base_path = None - if hs.config.backup_media_store_path: - self.backup_base_path = hs.config.backup_media_store_path + self.backup_base_path = hs.config.backup_media_store_path self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store From a675bd08bd1a016a16bd0e10547e8c26be391ee0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:41:06 +0100 Subject: [PATCH 22/53] Add paths back in... --- synapse/rest/media/v1/filepath.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index a3a15ac302..fec0bbf572 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -100,7 +100,7 @@ class MediaFilePaths(object): def remote_media_thumbnail_dir(self, server_name, file_id): return os.path.join( - "remote_thumbnail", server_name, + self.primary_base_path, "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], ) @@ -125,18 +125,18 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - "url_cache", + self.primary_base_path, "url_cache", media_id[:10], ), ] else: return [ os.path.join( - "url_cache", + self.primary_base_path, "url_cache", media_id[0:2], media_id[2:4], ), os.path.join( - "url_cache", + self.primary_base_path, "url_cache", media_id[0:2], ), ] From 1f43d2239757db0b376a3582066190221942cddc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 11:42:07 +0100 Subject: [PATCH 23/53] Don't needlessly rename variable --- synapse/rest/media/v1/filepath.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index fec0bbf572..d5164e47e0 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -27,7 +27,7 @@ def _wrap_in_base_path(func): @functools.wraps(func) def _wrapped(self, *args, **kwargs): path = func(self, *args, **kwargs) - return os.path.join(self.primary_base_path, path) + return os.path.join(self.base_path, path) return _wrapped @@ -41,7 +41,7 @@ class MediaFilePaths(object): """ def __init__(self, primary_base_path): - self.primary_base_path = primary_base_path + self.base_path = primary_base_path def default_thumbnail_rel(self, default_top_level, default_sub_type, width, height, content_type, method): @@ -100,7 +100,7 @@ class MediaFilePaths(object): def remote_media_thumbnail_dir(self, server_name, file_id): return os.path.join( - self.primary_base_path, "remote_thumbnail", server_name, + self.base_path, "remote_thumbnail", server_name, file_id[0:2], file_id[2:4], file_id[4:], ) @@ -125,18 +125,18 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - self.primary_base_path, "url_cache", + self.base_path, "url_cache", media_id[:10], ), ] else: return [ os.path.join( - self.primary_base_path, "url_cache", + self.base_path, "url_cache", media_id[0:2], media_id[2:4], ), os.path.join( - self.primary_base_path, "url_cache", + self.base_path, "url_cache", media_id[0:2], ), ] @@ -172,12 +172,12 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:], ) else: return os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ) @@ -188,26 +188,26 @@ class MediaFilePaths(object): if NEW_FORMAT_ID_RE.match(media_id): return [ os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[:10], media_id[11:], ), os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[:10], ), ] else: return [ os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], media_id[4:], ), os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[0:2], media_id[2:4], ), os.path.join( - self.primary_base_path, "url_cache_thumbnails", + self.base_path, "url_cache_thumbnails", media_id[0:2], ), ] From c021c39cbd0bf1f0a85c9699275600ac35aa9ec4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 13:46:53 +0100 Subject: [PATCH 24/53] Remove spurious addition --- synapse/rest/media/v1/media_repository.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index f06813c48c..3b8fe5ddb4 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -380,8 +380,6 @@ class MediaRepository(object): )) if t_byte_source: - t_width, t_height = t_byte_source.dimensions - output_path = yield self.write_to_file_and_backup( t_byte_source, self.filepaths.local_media_thumbnail_rel( @@ -410,8 +408,6 @@ class MediaRepository(object): )) if t_byte_source: - t_width, t_height = t_byte_source.dimensions - output_path = yield self.write_to_file_and_backup( t_byte_source, self.filepaths.remote_media_thumbnail_rel( From ad1911bbf46f658ee343ee26e3011b3f1bcbd572 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 13:47:05 +0100 Subject: [PATCH 25/53] Comment --- synapse/rest/media/v1/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 3b8fe5ddb4..700fd0dd24 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -121,7 +121,7 @@ class MediaRepository(object): Args: source: A file like object that should be written - path(str): Relative path to write file to + path (str): Relative path to write file to Returns: Deferred[str]: the file path written to in the primary media store From 31aa7bd8d1748548b2523f58b348bb6787dcc019 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 13:47:38 +0100 Subject: [PATCH 26/53] Move type into key --- synapse/rest/media/v1/media_repository.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 700fd0dd24..dee834389f 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -468,15 +468,15 @@ class MediaRepository(object): thumbnails = {} for r_width, r_height, r_method, r_type in requirements: if r_method == "crop": - thumbnails.setdefault((r_width, r_height), (r_method, r_type)) + thumbnails.setdefault((r_width, r_height,r_type), r_method) elif r_method == "scale": t_width, t_height = thumbnailer.aspect(r_width, r_height) t_width = min(m_width, t_width) t_height = min(m_height, t_height) - thumbnails[(t_width, t_height)] = (r_method, r_type) + thumbnails[(t_width, t_height, r_type)] = r_method # Now we generate the thumbnails for each dimension, store it - for (t_width, t_height), (t_method, t_type) in thumbnails.iteritems(): + for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): # Generate the thumbnail if t_type == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( From 931fc43cc840f40402c78e9478cf3adac6559b27 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 13 Oct 2017 13:54:19 +0100 Subject: [PATCH 27/53] fix copyright to companies which actually exist(ed) --- docs/sphinx/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py index 15c19834fc..06e1b3c33d 100644 --- a/docs/sphinx/conf.py +++ b/docs/sphinx/conf.py @@ -50,7 +50,7 @@ master_doc = 'index' # General information about the project. project = u'Synapse' -copyright = u'2014, TNG' +copyright = u'Copyright 2014-2017 OpenMarket, 2017 Vector Creations Ltd, 2017 New Vector Ltd' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the From b92a8e6e4aa2283daaa4e6050f1dbd503ddc9434 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 13:58:57 +0100 Subject: [PATCH 28/53] PEP8 --- synapse/rest/media/v1/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index dee834389f..d2ac0175d7 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -468,7 +468,7 @@ class MediaRepository(object): thumbnails = {} for r_width, r_height, r_method, r_type in requirements: if r_method == "crop": - thumbnails.setdefault((r_width, r_height,r_type), r_method) + thumbnails.setdefault((r_width, r_height, r_type), r_method) elif r_method == "scale": t_width, t_height = thumbnailer.aspect(r_width, r_height) t_width = min(m_width, t_width) From 2b24416e90b0bf1ee6d29cfc384670f4eeca0ced Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 14:11:34 +0100 Subject: [PATCH 29/53] Don't reuse source but instead copy from primary media store to backup --- synapse/rest/media/v1/media_repository.py | 28 ++++++------------- synapse/rest/media/v1/preview_url_resource.py | 3 +- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index d2ac0175d7..e32a67e16a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -95,7 +95,7 @@ class MediaRepository(object): os.makedirs(dirname) @staticmethod - def _write_file_synchronously(source, fname, close_source=False): + def _write_file_synchronously(source, fname): """Write `source` to the path `fname` synchronously. Should be called from a thread. @@ -109,16 +109,11 @@ class MediaRepository(object): with open(fname, "wb") as f: shutil.copyfileobj(source, f) - if close_source: - source.close() - @defer.inlineCallbacks def write_to_file_and_backup(self, source, path): """Write `source` to the on disk media store, and also the backup store if configured. - Will close source once finished. - Args: source: A file like object that should be written path (str): Relative path to write file to @@ -134,37 +129,31 @@ class MediaRepository(object): )) # Write to backup repository - yield self.copy_to_backup(source, path) + yield self.copy_to_backup(path) defer.returnValue(fname) @defer.inlineCallbacks - def copy_to_backup(self, source, path): - """Copy file like object source to the backup media store, if configured. - - Will close source after its done. + def copy_to_backup(self, path): + """Copy a file from the primary to backup media store, if configured. Args: - source: A file like object that should be written path(str): Relative path to write file to """ if self.backup_base_path: + primary_fname = os.path.join(self.primary_base_path, path) backup_fname = os.path.join(self.backup_base_path, path) # We can either wait for successful writing to the backup repository # or write in the background and immediately return if self.synchronous_backup_media_store: yield make_deferred_yieldable(threads.deferToThread( - self._write_file_synchronously, source, backup_fname, - close_source=True, + shutil.copyfile, primary_fname, backup_fname, )) else: preserve_fn(threads.deferToThread)( - self._write_file_synchronously, source, backup_fname, - close_source=True, + shutil.copyfile, primary_fname, backup_fname, ) - else: - source.close() @defer.inlineCallbacks def create_content(self, media_type, upload_name, content, content_length, @@ -280,8 +269,7 @@ class MediaRepository(object): server_name, media_id) raise SynapseError(502, "Failed to fetch remote media") - # Will close the file after its done - yield self.copy_to_backup(open(fname), fpath) + yield self.copy_to_backup(fpath) media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index c734f6b7cd..2a3e37fdf4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -275,8 +275,7 @@ class PreviewUrlResource(Resource): ) # FIXME: pass through 404s and other error messages nicely - # Will close the file after its done - yield self.media_repo.copy_to_backup(open(fname), fpath) + yield self.media_repo.copy_to_backup(fpath) media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() From 64665b57d0902c44235994d9cbf16ae61a784ab1 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Fri, 13 Oct 2017 14:26:07 +0100 Subject: [PATCH 30/53] oops --- docs/sphinx/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sphinx/conf.py b/docs/sphinx/conf.py index 06e1b3c33d..0b15bd8912 100644 --- a/docs/sphinx/conf.py +++ b/docs/sphinx/conf.py @@ -50,7 +50,7 @@ master_doc = 'index' # General information about the project. project = u'Synapse' -copyright = u'Copyright 2014-2017 OpenMarket, 2017 Vector Creations Ltd, 2017 New Vector Ltd' +copyright = u'Copyright 2014-2017 OpenMarket Ltd, 2017 Vector Creations Ltd, 2017 New Vector Ltd' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the From 6b725cf56aaf090f9cc9d5409dec7912feae8869 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 15:23:41 +0100 Subject: [PATCH 31/53] Remove old comment --- synapse/rest/media/v1/media_repository.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index e32a67e16a..cc267d0c16 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -102,7 +102,6 @@ class MediaRepository(object): Args: source: A file like object to be written fname (str): Path to write to - close_source (bool): Whether to close source after writing """ MediaRepository._makedirs(fname) source.seek(0) # Ensure we read from the start of the file From 1b6b0b1e66ab1cd5682ad1fa99020474afd6db7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Oct 2017 15:34:08 +0100 Subject: [PATCH 32/53] Add try/finally block to close t_byte_source --- synapse/rest/media/v1/media_repository.py | 65 ++++++++++++++--------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index cc267d0c16..515b3d3e74 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -367,12 +367,16 @@ class MediaRepository(object): )) if t_byte_source: - output_path = yield self.write_to_file_and_backup( - t_byte_source, - self.filepaths.local_media_thumbnail_rel( - media_id, t_width, t_height, t_type, t_method + try: + output_path = yield self.write_to_file_and_backup( + t_byte_source, + self.filepaths.local_media_thumbnail_rel( + media_id, t_width, t_height, t_type, t_method + ) ) - ) + finally: + t_byte_source.close() + logger.info("Stored thumbnail in file %r", output_path) t_len = os.path.getsize(output_path) @@ -395,12 +399,16 @@ class MediaRepository(object): )) if t_byte_source: - output_path = yield self.write_to_file_and_backup( - t_byte_source, - self.filepaths.remote_media_thumbnail_rel( - server_name, file_id, t_width, t_height, t_type, t_method + try: + output_path = yield self.write_to_file_and_backup( + t_byte_source, + self.filepaths.remote_media_thumbnail_rel( + server_name, file_id, t_width, t_height, t_type, t_method + ) ) - ) + finally: + t_byte_source.close() + logger.info("Stored thumbnail in file %r", output_path) t_len = os.path.getsize(output_path) @@ -464,18 +472,6 @@ class MediaRepository(object): # Now we generate the thumbnails for each dimension, store it for (t_width, t_height, t_type), t_method in thumbnails.iteritems(): - # Generate the thumbnail - if t_type == "crop": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( - thumbnailer.crop, - r_width, r_height, t_type, - )) - else: - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( - thumbnailer.scale, - r_width, r_height, t_type, - )) - # Work out the correct file name for thumbnail if server_name: file_path = self.filepaths.remote_media_thumbnail_rel( @@ -490,8 +486,29 @@ class MediaRepository(object): media_id, t_width, t_height, t_type, t_method ) - # Write to disk - output_path = yield self.write_to_file_and_backup(t_byte_source, file_path) + # Generate the thumbnail + if t_type == "crop": + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.crop, + r_width, r_height, t_type, + )) + else: + t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + thumbnailer.scale, + r_width, r_height, t_type, + )) + + if not t_byte_source: + continue + + try: + # Write to disk + output_path = yield self.write_to_file_and_backup( + t_byte_source, file_path, + ) + finally: + t_byte_source.close() + t_len = os.path.getsize(output_path) # Write to database From 9342bcfce08c862ea5027e865b23bf2bd6ad3a8c Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 16 Oct 2017 13:38:10 +0100 Subject: [PATCH 33/53] Omit the *s for @room notifications They're just redundant --- synapse/push/baserules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 9dce99ebec..7a18afe5f9 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -245,7 +245,7 @@ BASE_APPEND_OVERRIDE_RULES = [ { 'kind': 'event_match', 'key': 'content.body', - 'pattern': '*@room*', + 'pattern': '@room', '_id': '_roomnotif_content', }, { From 6079d0027aa79a6e2e41254ceda42206e307add7 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 14:20:45 +0100 Subject: [PATCH 34/53] Log a warning when no profile for invited member And return empty profile --- synapse/handlers/groups_local.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 3b676d46bd..97a20f2b04 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -313,8 +313,11 @@ class GroupsLocalHandler(object): self.notifier.on_new_event( "groups_key", token, users=[user_id], ) - - user_profile = yield self.profile_handler.get_profile(user_id) + try: + user_profile = yield self.profile_handler.get_profile(user_id) + except Exception as e: + logger.warn("No profile for user %s: %s", user_id, e) + user_profile = {} defer.returnValue({"state": "invite", "user_profile": user_profile}) From 2c5972f87f0541aaeff43846f7050ab91d11cf0e Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:31:11 +0100 Subject: [PATCH 35/53] Implement GET /groups/$groupId/invited_users --- synapse/federation/transport/client.py | 13 ++++++++++ synapse/federation/transport/server.py | 18 ++++++++++++- synapse/groups/groups_server.py | 35 ++++++++++++++++++++++++++ synapse/handlers/groups_local.py | 17 +++++++++++++ synapse/rest/client/v2_alpha/groups.py | 21 ++++++++++++++++ synapse/storage/group_server.py | 12 +++++++++ 6 files changed, 115 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f96561c1fe..125d8f3598 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -550,6 +550,19 @@ class TransportLayerClient(object): ignore_backoff=True, ) + @log_function + def get_invited_users_in_group(self, destination, group_id, requester_user_id): + """Get users that have been invited to a group + """ + path = PREFIX + "/groups/%s/invited_users" % (group_id,) + + return self.client.get_json( + destination=destination, + path=path, + args={"requester_user_id": requester_user_id}, + ignore_backoff=True, + ) + @log_function def accept_group_invite(self, destination, group_id, user_id, content): """Accept a group invite diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c7565e0737..625a2fe27f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -720,6 +720,22 @@ class FederationGroupsUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) +class FederationGroupsInvitedUsersServlet(BaseFederationServlet): + """Get the users that have been invited to a group + """ + PATH = "/groups/(?P[^/]*)/invited_users$" + + @defer.inlineCallbacks + def on_GET(self, origin, content, query, group_id): + requester_user_id = parse_string_from_args(query, "requester_user_id") + if get_domain_from_id(requester_user_id) != origin: + raise SynapseError(403, "requester_user_id doesn't match origin") + + new_content = yield self.handler.get_invited_users_in_group( + group_id, requester_user_id + ) + + defer.returnValue((200, new_content)) class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group @@ -1109,12 +1125,12 @@ ROOM_LIST_CLASSES = ( PublicRoomList, ) - GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, FederationGroupsUsersServlet, + FederationGroupsInvitedUsersServlet, FederationGroupsInviteServlet, FederationGroupsAcceptInviteServlet, FederationGroupsRemoveUserServlet, diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 1083bc2990..bfa46b7cb2 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -420,6 +420,41 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(user_results), }) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get the users that have been invited to a group as seen by requester_user_id. + + The ordering is arbitrary at the moment + """ + + yield self.check_group_is_ours(group_id, and_exists=True) + + is_user_in_group = yield self.store.is_user_in_group(requester_user_id, group_id) + + if not is_user_in_group: + raise SynapseError(403, "User not in group") + + invited_users = yield self.store.get_invited_users_in_group(group_id) + + user_profiles = [] + + for user_id in invited_users: + user_profile = { + "user_id": user_id + } + try: + profile = yield self.profile_handler.get_profile_from_cache(user) + user_profile.update(profile) + except Exception as e: + pass + user_profiles.append(user_profile) + + defer.returnValue({ + "chunk": user_profiles, + "total_user_count_estimate": len(invited_users), + }) + + @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 97a20f2b04..5263e769bb 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -219,6 +219,23 @@ class GroupsLocalHandler(object): defer.returnValue(res) + @defer.inlineCallbacks + def get_invited_users_in_group(self, group_id, requester_user_id): + """Get users invited to a group + """ + if self.is_mine_id(group_id): + res = yield self.groups_server_handler.get_invited_users_in_group( + group_id, requester_user_id + ) + defer.returnValue(res) + + group_server_name = get_domain_from_id(group_id) + + res = yield self.transport_client.get_users_in_group( + get_domain_from_id(group_id), group_id, requester_user_id, + ) + defer.returnValue(res) + @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 8f3ce15b02..4532112cfc 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -370,6 +370,26 @@ class GroupUsersServlet(RestServlet): defer.returnValue((200, result)) +class GroupInvitedUsersServlet(RestServlet): + """Get users invited to a group + """ + PATTERNS = client_v2_patterns("/groups/(?P[^/]*)/invited_users$") + + def __init__(self, hs): + super(GroupInvitedUsersServlet, self).__init__() + self.auth = hs.get_auth() + self.clock = hs.get_clock() + self.groups_handler = hs.get_groups_local_handler() + + @defer.inlineCallbacks + def on_GET(self, request, group_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + result = yield self.groups_handler.get_invited_users_in_group(group_id, user_id) + + defer.returnValue((200, result)) + class GroupCreateServlet(RestServlet): """Create a group @@ -674,6 +694,7 @@ class GroupsForUserServlet(RestServlet): def register_servlets(hs, http_server): GroupServlet(hs).register(http_server) GroupSummaryServlet(hs).register(http_server) + GroupInvitedUsersServlet(hs).register(http_server) GroupUsersServlet(hs).register(http_server) GroupRoomServlet(hs).register(http_server) GroupCreateServlet(hs).register(http_server) diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py index 3af372de59..9e63db5c6c 100644 --- a/synapse/storage/group_server.py +++ b/synapse/storage/group_server.py @@ -56,6 +56,18 @@ class GroupServerStore(SQLBaseStore): desc="get_users_in_group", ) + def get_invited_users_in_group(self, group_id): + # TODO: Pagination + + return self._simple_select_onecol( + table="group_invites", + keyvalues={ + "group_id": group_id, + }, + retcol="user_id", + desc="get_invited_users_in_group", + ) + def get_rooms_in_group(self, group_id, include_private=False): # TODO: Pagination From a3ac4f6b0ad9cce172678fa87c10a1100d16236f Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:41:03 +0100 Subject: [PATCH 36/53] _create_rererouter for get_invited_users_in_group --- synapse/handlers/groups_local.py | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 5263e769bb..6699d0888f 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -68,6 +68,8 @@ class GroupsLocalHandler(object): update_group_profile = _create_rerouter("update_group_profile") get_rooms_in_group = _create_rerouter("get_rooms_in_group") + get_invited_users_in_group = _create_rerouter("get_invited_users_in_group") + add_room_to_group = _create_rerouter("add_room_to_group") remove_room_from_group = _create_rerouter("remove_room_from_group") @@ -219,23 +221,6 @@ class GroupsLocalHandler(object): defer.returnValue(res) - @defer.inlineCallbacks - def get_invited_users_in_group(self, group_id, requester_user_id): - """Get users invited to a group - """ - if self.is_mine_id(group_id): - res = yield self.groups_server_handler.get_invited_users_in_group( - group_id, requester_user_id - ) - defer.returnValue(res) - - group_server_name = get_domain_from_id(group_id) - - res = yield self.transport_client.get_users_in_group( - get_domain_from_id(group_id), group_id, requester_user_id, - ) - defer.returnValue(res) - @defer.inlineCallbacks def join_group(self, group_id, user_id, content): """Request to join a group From c43e8a97367e939cd1e0b7819095950a156c9f49 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:50:39 +0100 Subject: [PATCH 37/53] Make it work. Warn about lack of user profile --- synapse/groups/groups_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index bfa46b7cb2..31fc711876 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -443,10 +443,10 @@ class GroupsServerHandler(object): "user_id": user_id } try: - profile = yield self.profile_handler.get_profile_from_cache(user) + profile = yield self.profile_handler.get_profile_from_cache(user_id) user_profile.update(profile) except Exception as e: - pass + logger.warn("Error getting profile for %s: %s", user_id, e) user_profiles.append(user_profile) defer.returnValue({ From 85f5674e44d695177cbff74e11b4ce6dac85d53a Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 16 Oct 2017 15:52:12 +0100 Subject: [PATCH 38/53] Delint --- synapse/federation/transport/server.py | 2 ++ synapse/groups/groups_server.py | 1 - synapse/rest/client/v2_alpha/groups.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 625a2fe27f..6a0bd8d222 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -720,6 +720,7 @@ class FederationGroupsUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + class FederationGroupsInvitedUsersServlet(BaseFederationServlet): """Get the users that have been invited to a group """ @@ -737,6 +738,7 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet): defer.returnValue((200, new_content)) + class FederationGroupsInviteServlet(BaseFederationServlet): """Ask a group server to invite someone to the group """ diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 31fc711876..a3a500b9d6 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -454,7 +454,6 @@ class GroupsServerHandler(object): "total_user_count_estimate": len(invited_users), }) - @defer.inlineCallbacks def get_rooms_in_group(self, group_id, requester_user_id): """Get the rooms in group as seen by requester_user_id diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py index 4532112cfc..d11bccc1da 100644 --- a/synapse/rest/client/v2_alpha/groups.py +++ b/synapse/rest/client/v2_alpha/groups.py @@ -370,6 +370,7 @@ class GroupUsersServlet(RestServlet): defer.returnValue((200, result)) + class GroupInvitedUsersServlet(RestServlet): """Get users invited to a group """ From c05e6015cc522b838ab2db6bffd494b017cf8ec6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 16 Oct 2017 17:57:27 +0100 Subject: [PATCH 39/53] Add config option to auto-join new users to rooms New users who register on the server will be dumped into all rooms in auto_join_rooms in the config. --- synapse/config/registration.py | 6 +++++ synapse/rest/client/v2_alpha/register.py | 34 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index f7e03c4cde..9e2a6d1ae5 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -41,6 +41,8 @@ class RegistrationConfig(Config): self.allow_guest_access and config.get("invite_3pid_guest", False) ) + self.auto_join_rooms = config.get("auto_join_rooms", []) + def default_config(self, **kwargs): registration_shared_secret = random_string_with_symbols(50) @@ -70,6 +72,10 @@ class RegistrationConfig(Config): - matrix.org - vector.im - riot.im + + # Users who register on this homeserver will automatically be joined to these rooms + #auto_join_rooms: + # - "#example:example.com" """ % locals() def add_arguments(self, parser): diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 1421c18152..d9a8cdbbb5 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -17,8 +17,10 @@ from twisted.internet import defer import synapse +import synapse.types from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType +from synapse.types import RoomID, RoomAlias from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string @@ -170,6 +172,7 @@ class RegisterRestServlet(RestServlet): self.auth_handler = hs.get_auth_handler() self.registration_handler = hs.get_handlers().registration_handler self.identity_handler = hs.get_handlers().identity_handler + self.room_member_handler = hs.get_handlers().room_member_handler self.device_handler = hs.get_device_handler() self.macaroon_gen = hs.get_macaroon_generator() @@ -340,6 +343,14 @@ class RegisterRestServlet(RestServlet): generate_token=False, ) + # auto-join the user to any rooms we're supposed to dump them into + fake_requester = synapse.types.create_requester(registered_user_id) + for r in self.hs.config.auto_join_rooms: + try: + yield self._join_user_to_room(fake_requester, r) + except Exception as e: + logger.error("Failed to join new user to %r: %r", r, e) + # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) self.auth_handler.set_session_data( @@ -372,6 +383,29 @@ class RegisterRestServlet(RestServlet): def on_OPTIONS(self, _): return 200, {} + @defer.inlineCallbacks + def _join_user_to_room(self, requester, room_identifier): + room_id = None + if RoomID.is_valid(room_identifier): + room_id = room_identifier + elif RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, remote_room_hosts = ( + yield self.room_member_handler.lookup_room_alias(room_alias) + ) + room_id = room_id.to_string() + else: + raise SynapseError(400, "%s was not legal room ID or room alias" % ( + room_identifier, + )) + + yield self.room_member_handler.update_membership( + requester=requester, + target=requester.user, + room_id=room_id, + action="join", + ) + @defer.inlineCallbacks def _do_appservice_registration(self, username, as_token, body): user_id = yield self.registration_handler.appservice_register( From a9c2e930ac455db13ce37a90b3ff0d93c0d1ea43 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Oct 2017 10:13:13 +0100 Subject: [PATCH 40/53] pep8 --- synapse/config/registration.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 9e2a6d1ae5..ef917fc9f2 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -73,7 +73,8 @@ class RegistrationConfig(Config): - vector.im - riot.im - # Users who register on this homeserver will automatically be joined to these rooms + # Users who register on this homeserver will automatically be joined + # to these rooms #auto_join_rooms: # - "#example:example.com" """ % locals() From 33122c5a1b3642712546bc290d591077ce4cc847 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Oct 2017 10:39:50 +0100 Subject: [PATCH 41/53] Fix test --- tests/rest/client/v2_alpha/test_register.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index b6173ab2ee..821c735528 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -47,6 +47,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.get_auth_handler = Mock(return_value=self.auth_handler) self.hs.get_device_handler = Mock(return_value=self.device_handler) self.hs.config.enable_registration = True + self.hs.config.auto_join_rooms = [] # init the thing we're testing self.servlet = RegisterRestServlet(self.hs) From 9b714abf357fbe16482a729933aa8e139ed278ad Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:43:36 +0100 Subject: [PATCH 42/53] Remove dead class This isn't used anywhere. --- tests/storage/event_injector.py | 76 --------------------------------- 1 file changed, 76 deletions(-) delete mode 100644 tests/storage/event_injector.py diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py deleted file mode 100644 index 024ac15069..0000000000 --- a/tests/storage/event_injector.py +++ /dev/null @@ -1,76 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from synapse.api.constants import EventTypes - - -class EventInjector: - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - self.message_handler = hs.get_handlers().message_handler - self.event_builder_factory = hs.get_event_builder_factory() - - @defer.inlineCallbacks - def create_room(self, room, user): - builder = self.event_builder_factory.new({ - "type": EventTypes.Create, - "sender": user.to_string(), - "room_id": room.to_string(), - "content": {}, - }) - - event, context = yield self.message_handler._create_new_client_event( - builder - ) - - yield self.store.persist_event(event, context) - - @defer.inlineCallbacks - def inject_room_member(self, room, user, membership): - builder = self.event_builder_factory.new({ - "type": EventTypes.Member, - "sender": user.to_string(), - "state_key": user.to_string(), - "room_id": room.to_string(), - "content": {"membership": membership}, - }) - - event, context = yield self.message_handler._create_new_client_event( - builder - ) - - yield self.store.persist_event(event, context) - - defer.returnValue(event) - - @defer.inlineCallbacks - def inject_message(self, room, user, body): - builder = self.event_builder_factory.new({ - "type": EventTypes.Message, - "sender": user.to_string(), - "state_key": user.to_string(), - "room_id": room.to_string(), - "content": {"body": body, "msgtype": u"message"}, - }) - - event, context = yield self.message_handler._create_new_client_event( - builder - ) - - yield self.store.persist_event(event, context) From 5b5f35ccc0cdd74fe8c8902988da222a7b8e2a67 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:52:31 +0100 Subject: [PATCH 43/53] Add some tests for make_deferred_yieldable --- tests/util/test_log_context.py | 38 ++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py index 9ffe209c4d..e2f7765f49 100644 --- a/tests/util/test_log_context.py +++ b/tests/util/test_log_context.py @@ -94,3 +94,41 @@ class LoggingContextTestCase(unittest.TestCase): yield defer.succeed(None) return self._test_preserve_fn(nonblocking_function) + + @defer.inlineCallbacks + def test_make_deferred_yieldable(self): + # a function which retuns an incomplete deferred, but doesn't follow + # the synapse rules. + def blocking_function(): + d = defer.Deferred() + reactor.callLater(0, d.callback, None) + return d + + sentinel_context = LoggingContext.current_context() + + with LoggingContext() as context_one: + context_one.test_key = "one" + + d1 = logcontext.make_deferred_yieldable(blocking_function()) + # make sure that the context was reset by make_deferred_yieldable + self.assertIs(LoggingContext.current_context(), sentinel_context) + + yield d1 + + # now it should be restored + self._check_test_key("one") + + @defer.inlineCallbacks + def test_make_deferred_yieldable_on_non_deferred(self): + """Check that make_deferred_yieldable does the right thing when its + argument isn't actually a deferred""" + + with LoggingContext() as context_one: + context_one.test_key = "one" + + d1 = logcontext.make_deferred_yieldable("bum") + self._check_test_key("one") + + r = yield d1 + self.assertEqual(r, "bum") + self._check_test_key("one") From a6ad8148b9d3058e316589a952744ecb15aa2057 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:53:34 +0100 Subject: [PATCH 44/53] Fix name of test_logcontext The file under test is logcontext.py, not log_context.py --- tests/util/{test_log_context.py => test_logcontext.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/util/{test_log_context.py => test_logcontext.py} (100%) diff --git a/tests/util/test_log_context.py b/tests/util/test_logcontext.py similarity index 100% rename from tests/util/test_log_context.py rename to tests/util/test_logcontext.py From 2e9f5ea31a9c66eceb6276c5241cc6537cb0ae4c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:59:30 +0100 Subject: [PATCH 45/53] Fix logcontext handling for persist_events * don't use preserve_context_over_deferred, which is known broken. * remove a redundant preserve_fn. * add/improve some comments --- synapse/storage/events.py | 24 +++++++++++++++++------- synapse/util/async.py | 5 +++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4f0b43c36d..637640ec2a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -21,7 +21,7 @@ from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, preserve_context_over_deferred + preserve_fn, PreserveLoggingContext, make_deferred_yieldable ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -88,13 +88,23 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + NB: due to the normal usage pattern of this method, it does *not* + follow the synapse logcontext rules, and leaves the logcontext in + place whether or not the returned deferred is ready. + Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): backfilled (bool): + + Returns: + defer.Deferred: a deferred which will resolve once the events are + persisted. Runs its callbacks *without* a logcontext. """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: + # if the last item in the queue has the same `backfilled` setting, + # we can just add these new events to that item. end_item = queue[-1] if end_item.backfilled == backfilled: end_item.events_and_contexts.extend(events_and_contexts) @@ -113,11 +123,11 @@ class _EventPeristenceQueue(object): def handle_queue(self, room_id, per_item_callback): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with for each item in the queue,1 + The given callback will be invoked with for each item in the queue, of type _EventPersistQueueItem. The per_item_callback will continuously be called with new items, unless the queue becomnes empty. The return value of the function will be given to the deferreds waiting on the item, - exceptions will be passed to the deferres as well. + exceptions will be passed to the deferreds as well. This function should therefore be called whenever anything is added to the queue. @@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.iteritems(): - d = preserve_fn(self._event_persist_queue.add_to_queue)( + d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, ) @@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore): for room_id in partitioned: self._maybe_start_persisting(room_id) - return preserve_context_over_deferred( + return make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) @@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield preserve_context_over_deferred(deferred) + yield make_deferred_yieldable(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield preserve_context_over_deferred(defer.gatherResults( + res = yield make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], diff --git a/synapse/util/async.py b/synapse/util/async.py index 0fd5b42523..a0a9039475 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -53,6 +53,11 @@ class ObservableDeferred(object): Cancelling or otherwise resolving an observer will not affect the original ObservableDeferred. + + NB that it does not attempt to do anything with logcontexts; in general + you should probably make_deferred_yieldable the deferreds + returned by `observe`, and ensure that the original deferred runs its + callbacks in the sentinel logcontext. """ __slots__ = ["_deferred", "_observers", "_result"] From a6245478c82d8bd2c9abea34b9ec4a94ccc5ed09 Mon Sep 17 00:00:00 2001 From: Krombel Date: Tue, 17 Oct 2017 12:45:33 +0200 Subject: [PATCH 46/53] fix thumbnailing (#2548) in commit 0e28281a the code for thumbnailing got refactored and the renaming of this variables was not done correctly. Signed-Off-by: Matthias Kesler --- synapse/rest/media/v1/media_repository.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 515b3d3e74..057c925b7b 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -490,12 +490,12 @@ class MediaRepository(object): if t_type == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( thumbnailer.crop, - r_width, r_height, t_type, + t_width, t_height, t_type, )) else: t_byte_source = yield make_deferred_yieldable(threads.deferToThread( thumbnailer.scale, - r_width, r_height, t_type, + t_width, t_height, t_type, )) if not t_byte_source: From 7216c76654bbb57bc0ebc27498de7eda247f8ffb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 14:46:17 +0100 Subject: [PATCH 47/53] Improve error handling for missing files (#2551) `os.path.exists` doesn't allow us to distinguish between permissions errors and the path actually not existing, which repeatedly confuses people. It also means that we try to overwrite existing key files, which is super-confusing. (cf issues #2455, #2379). Use os.stat instead. Also, don't recomemnd the the use of --generate-config, which screws everything up if you're using debian (cf #2455). --- synapse/config/_base.py | 36 ++++++++++++++++++++++++++---------- synapse/config/key.py | 8 ++++---- synapse/config/tls.py | 6 +++--- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 1ab5593c6e..fa105bce72 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -81,22 +81,38 @@ class Config(object): def abspath(file_path): return os.path.abspath(file_path) if file_path else file_path + @classmethod + def path_exists(cls, file_path): + """Check if a file exists + + Unlike os.path.exists, this throws an exception if there is an error + checking if the file exists (for example, if there is a perms error on + the parent dir). + + Returns: + bool: True if the file exists; False if not. + """ + try: + os.stat(file_path) + return True + except OSError as e: + if e.errno != errno.ENOENT: + raise e + return False + @classmethod def check_file(cls, file_path, config_name): if file_path is None: raise ConfigError( "Missing config for %s." - " You must specify a path for the config file. You can " - "do this with the -c or --config-path option. " - "Adding --generate-config along with --server-name " - " will generate a config file at the given path." % (config_name,) ) - if not os.path.exists(file_path): + try: + os.stat(file_path) + except OSError as e: raise ConfigError( - "File %s config for %s doesn't exist." - " Try running again with --generate-config" - % (file_path, config_name,) + "Error accessing file '%s' (config for %s): %s" + % (file_path, config_name, e.strerror) ) return cls.abspath(file_path) @@ -248,7 +264,7 @@ class Config(object): " -c CONFIG-FILE\"" ) (config_path,) = config_files - if not os.path.exists(config_path): + if not cls.path_exists(config_path): if config_args.keys_directory: config_dir_path = config_args.keys_directory else: @@ -261,7 +277,7 @@ class Config(object): "Must specify a server_name to a generate config for." " Pass -H server.name." ) - if not os.path.exists(config_dir_path): + if not cls.path_exists(config_dir_path): os.makedirs(config_dir_path) with open(config_path, "wb") as config_file: config_bytes, config = obj.generate_config( diff --git a/synapse/config/key.py b/synapse/config/key.py index 6ee643793e..4b8fc063d0 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -118,10 +118,9 @@ class KeyConfig(Config): signing_keys = self.read_file(signing_key_path, "signing_key") try: return read_signing_keys(signing_keys.splitlines(True)) - except Exception: + except Exception as e: raise ConfigError( - "Error reading signing_key." - " Try running again with --generate-config" + "Error reading signing_key: %s" % (str(e)) ) def read_old_signing_keys(self, old_signing_keys): @@ -141,7 +140,8 @@ class KeyConfig(Config): def generate_files(self, config): signing_key_path = config["signing_key_path"] - if not os.path.exists(signing_key_path): + + if not self.path_exists(signing_key_path): with open(signing_key_path, "w") as signing_key_file: key_id = "a_" + random_string(4) write_signing_keys( diff --git a/synapse/config/tls.py b/synapse/config/tls.py index e081840a83..247f18f454 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -126,7 +126,7 @@ class TlsConfig(Config): tls_private_key_path = config["tls_private_key_path"] tls_dh_params_path = config["tls_dh_params_path"] - if not os.path.exists(tls_private_key_path): + if not self.path_exists(tls_private_key_path): with open(tls_private_key_path, "w") as private_key_file: tls_private_key = crypto.PKey() tls_private_key.generate_key(crypto.TYPE_RSA, 2048) @@ -141,7 +141,7 @@ class TlsConfig(Config): crypto.FILETYPE_PEM, private_key_pem ) - if not os.path.exists(tls_certificate_path): + if not self.path_exists(tls_certificate_path): with open(tls_certificate_path, "w") as certificate_file: cert = crypto.X509() subject = cert.get_subject() @@ -159,7 +159,7 @@ class TlsConfig(Config): certificate_file.write(cert_pem) - if not os.path.exists(tls_dh_params_path): + if not self.path_exists(tls_dh_params_path): if GENERATE_DH_PARAMS: subprocess.check_call([ "openssl", "dhparam", From c2bd177ea0e58d9831167273faa9bf9155466513 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 17:05:42 +0100 Subject: [PATCH 48/53] Fix 500 error when fields missing from power_levels event If the users or events keys were missing from a power_levels event, then we would throw 500s when trying to auth them. --- synapse/event_auth.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 4096c606f1..9e746a28bf 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -470,14 +470,14 @@ def _check_power_levels(event, auth_events): ("invite", None), ] - old_list = current_state.content.get("users") + old_list = current_state.content.get("users", {}) for user in set(old_list.keys() + user_list.keys()): levels_to_check.append( (user, "users") ) - old_list = current_state.content.get("events") - new_list = event.content.get("events") + old_list = current_state.content.get("events", {}) + new_list = event.content.get("events", {}) for ev_id in set(old_list.keys() + new_list.keys()): levels_to_check.append( (ev_id, "events") From 74f99f227cb700e41fc6c309398c0e5a6a42439a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 19:46:56 +0100 Subject: [PATCH 49/53] Doc some more dynamic Homeserver methods --- synapse/server.pyi | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/server.pyi b/synapse/server.pyi index 9570df5537..e8c0386b7f 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,4 +1,6 @@ import synapse.api.auth +import synapse.federation.transaction_queue +import synapse.federation.transport.client import synapse.handlers import synapse.handlers.auth import synapse.handlers.device @@ -27,3 +29,9 @@ class HomeServer(object): def get_state_handler(self) -> synapse.state.StateHandler: pass + + def get_federation_sender(self) -> synapse.federation.transaction_queue.TransactionQueue: + pass + + def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient: + pass From 582bd19ee9de4850010b58e0bb0e2a93f01b2f44 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 20:46:55 +0100 Subject: [PATCH 50/53] Fix 500 error when we get an error handling a PDU FederationServer doesn't have a send_failure (and nor does its subclass, ReplicationLayer), so this was failing. I'm not really sure what the idea behind send_failure is, given (a) we don't do anything at the other end with it except log it, and (b) we also send back the failure via the transaction response. I suspect there's a whole lot of dead code around it, but for now I'm just removing the broken bit. --- synapse/federation/federation_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a8034bddc6..e15228e70b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -192,7 +192,6 @@ class FederationServer(FederationBase): pdu_results[event_id] = {} except FederationError as e: logger.warn("Error handling PDU %s: %s", event_id, e) - self.send_failure(e, transaction.origin) pdu_results[event_id] = {"error": str(e)} except Exception as e: pdu_results[event_id] = {"error": str(e)} From 161a862ffb250b5387f1a2c57e07e999ba00571e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2017 10:17:43 +0100 Subject: [PATCH 51/53] Fix typo --- synapse/groups/attestations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 5ef7a12cb7..02e2e17589 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -126,7 +126,7 @@ class GroupAttestionRenewer(object): ) @defer.inlineCallbacks - def _renew_attestation(self, group_id, user_id): + def _renew_attestation(group_id, user_id): attestation = self.attestations.create_attestation(group_id, user_id) if self.hs.is_mine_id(group_id): From bd5718d0ad2a9380ae292507a1022a230f8b2011 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2017 10:27:18 +0100 Subject: [PATCH 52/53] Fix typo in thumbnail generation --- synapse/rest/media/v1/media_repository.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 057c925b7b..6b50b45b1f 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -487,16 +487,19 @@ class MediaRepository(object): ) # Generate the thumbnail - if t_type == "crop": + if t_method == "crop": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( thumbnailer.crop, t_width, t_height, t_type, )) - else: + elif t_method == "scale": t_byte_source = yield make_deferred_yieldable(threads.deferToThread( thumbnailer.scale, t_width, t_height, t_type, )) + else: + logger.error("Unrecognized method: %r", t_method) + continue if not t_byte_source: continue From 9ab859f27b177bbcd4fe0b3eb1bef0ce649ea41e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Oct 2017 10:55:44 +0100 Subject: [PATCH 53/53] Fix typo in group attestation handling --- synapse/federation/transport/server.py | 2 +- synapse/groups/attestations.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 6a0bd8d222..09b97138c3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -835,7 +835,7 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): def on_POST(self, origin, content, query, group_id, user_id): # We don't need to check auth here as we check the attestation signatures - new_content = yield self.handler.on_renew_group_attestation( + new_content = yield self.handler.on_renew_attestation( origin, content, group_id, user_id ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 02e2e17589..b751cf5e43 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -90,6 +90,7 @@ class GroupAttestionRenewer(object): self.assestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() self.is_mine_id = hs.is_mine_id + self.attestations = hs.get_groups_attestation_signing() self._renew_attestations_loop = self.clock.looping_call( self._renew_attestations, 30 * 60 * 1000, @@ -129,7 +130,7 @@ class GroupAttestionRenewer(object): def _renew_attestation(group_id, user_id): attestation = self.attestations.create_attestation(group_id, user_id) - if self.hs.is_mine_id(group_id): + if self.is_mine_id(group_id): destination = get_domain_from_id(user_id) else: destination = get_domain_from_id(group_id)