From a0ef9f84be89f4b7ff6239215d16cfc63f318b7b Mon Sep 17 00:00:00 2001 From: xz-dev Date: Sun, 15 Sep 2024 13:21:41 +0800 Subject: [PATCH] perf: convert static animated WebP to PNG --- sticker/lib/util.py | 89 +++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 28 deletions(-) diff --git a/sticker/lib/util.py b/sticker/lib/util.py index 3e4432e..580824c 100644 --- a/sticker/lib/util.py +++ b/sticker/lib/util.py @@ -15,6 +15,7 @@ # along with this program. If not, see . from functools import partial from io import BytesIO +import numpy as np import os.path import subprocess import json @@ -48,7 +49,7 @@ def guess_mime(data: bytes) -> str: return mime or "image/png" -def video_to_webp(data: bytes) -> bytes: +def _video_to_webp(data: bytes) -> bytes: mime = guess_mime(data) ext = mimetypes.guess_extension(mime) with tempfile.NamedTemporaryFile(suffix=ext) as video: @@ -74,32 +75,62 @@ def video_to_webp(data: bytes) -> bytes: return webp.read() -def video_to_gif(data: bytes, mime: str) -> bytes: +def video_to_webp(data: bytes) -> bytes: + mime = guess_mime(data) ext = mimetypes.guess_extension(mime) - if mime.startswith("video/"): - # run ffmpeg to fix duration - with tempfile.NamedTemporaryFile(suffix=ext) as temp: - temp.write(data) - temp.flush() - with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed: - print(".", end="", flush=True) - result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name], - capture_output=True) - if result.returncode != 0: - raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}") - temp_fixed.seek(0) - data = temp_fixed.read() - data = video_to_webp(data) + # run ffmpeg to fix duration with tempfile.NamedTemporaryFile(suffix=ext) as temp: temp.write(data) temp.flush() - with tempfile.NamedTemporaryFile(suffix=".gif") as gif: + with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed: print(".", end="", flush=True) - im = Image.open(temp.name) + result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name], + capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}") + temp_fixed.seek(0) + data = temp_fixed.read() + return _video_to_webp(data) + + +def webp_to_others(data: bytes, mimetype: str) -> bytes: + with tempfile.NamedTemporaryFile(suffix=".webp") as webp: + webp.write(data) + webp.flush() + ext = mimetypes.guess_extension(mimetype) + with tempfile.NamedTemporaryFile(suffix=ext) as img: + print(".", end="", flush=True) + im = Image.open(webp.name) im.info.pop('background', None) - im.save(gif.name, save_all=True, lossless=True, quality=100, method=6) - gif.seek(0) - return gif.read() + im.save(img.name, save_all=True, lossless=True, quality=100, method=6) + img.seek(0) + return img.read() + + +def is_uniform_animated_webp(data: bytes) -> bool: + img = Image.open(BytesIO(data)) + if img.n_frames <= 1: + return False + + first_frame = np.array(img) + for frame_number in range(1, img.n_frames): + img.seek(frame_number) + current_frame = np.array(img) + if not np.array_equal(first_frame, current_frame): + return False + + return True + + +def webp_to_gif_or_png(data: bytes) -> bytes: + # check if the webp is animated + image: Image.Image = Image.open(BytesIO(data)) + is_animated = getattr(image, "is_animated", False) + if is_animated and is_uniform_animated_webp(data): + return webp_to_others(data, "image/gif") + else: + # convert to png + return webp_to_others(data, "image/png") def opermize_gif(data: bytes) -> bytes: @@ -118,9 +149,6 @@ def opermize_gif(data: bytes) -> bytes: def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int): image: Image.Image = Image.open(BytesIO(data)) new_file = BytesIO() - suffix = mimetypes.guess_extension(mimetype) - if suffix: - suffix = suffix[1:] # Determine if the image is a GIF is_animated = getattr(image, "is_animated", False) if is_animated: @@ -138,6 +166,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int): # Get the size of the first frame to determine resizing w, h = frames[0].size else: + suffix = mimetypes.guess_extension(mimetype) + if suffix: + suffix = suffix[1:] image = image.convert("RGBA") image.save(new_file, format=suffix) w, h = image.size @@ -155,9 +186,8 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int): def _convert_sticker(data: bytes) -> (bytes, str, int, int): mimetype = guess_mime(data) if mimetype.startswith("video/"): - data = video_to_gif(data, mimetype) + data = video_to_webp(data) print(".", end="", flush=True) - mimetype = "image/gif" elif mimetype.startswith("application/gzip"): print(".", end="", flush=True) # unzip file @@ -168,7 +198,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int): suffix = mimetypes.guess_extension(mimetype) with tempfile.NamedTemporaryFile(suffix=suffix) as temp: temp.write(data) - with tempfile.NamedTemporaryFile(suffix=".gif") as gif: + with tempfile.NamedTemporaryFile(suffix=".webp") as gif: # run lottie_convert.py input output print(".", end="", flush=True) import subprocess @@ -179,7 +209,10 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int): raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}") gif.seek(0) data = gif.read() - mimetype = "image/gif" + mimetype = guess_mime(data) + if mimetype == "image/webp": + data = webp_to_gif_or_png(data) + mimetype = guess_mime(data) rlt = _convert_image(data, mimetype) data = rlt[0] if mimetype == "image/gif":