diff --git a/.gitignore b/.gitignore index b2a42f7..3e0fad4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,9 +5,11 @@ *.pyc __pycache__ *.egg-info +build/ node_modules web/lib/import-map.json +web/packs/*.json *.session /*.json diff --git a/requirements.txt b/requirements.txt index 6d89dbf..560d2a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ pillow telethon cryptg python-magic +lottie[all] diff --git a/sticker/lib/util.py b/sticker/lib/util.py index 2b3fe2a..3e4432e 100644 --- a/sticker/lib/util.py +++ b/sticker/lib/util.py @@ -16,19 +16,131 @@ from functools import partial from io import BytesIO import os.path +import subprocess import json +import tempfile +import mimetypes -from PIL import Image +try: + import magic +except ImportError: + print("[Warning] Magic is not installed, using file extensions to guess mime types") + magic = None +from PIL import Image, ImageSequence from . import matrix open_utf8 = partial(open, encoding='UTF-8') -def convert_image(data: bytes) -> (bytes, int, int): - image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") + +def guess_mime(data: bytes) -> str: + mime = None + if magic: + try: + return magic.Magic(mime=True).from_buffer(data) + except Exception: + pass + else: + with tempfile.NamedTemporaryFile() as temp: + temp.write(data) + temp.close() + mime, _ = mimetypes.guess_type(temp.name) + return mime or "image/png" + + +def video_to_webp(data: bytes) -> bytes: + mime = guess_mime(data) + ext = mimetypes.guess_extension(mime) + with tempfile.NamedTemporaryFile(suffix=ext) as video: + video.write(data) + video.flush() + with tempfile.NamedTemporaryFile(suffix=".webp") as webp: + print(".", end="", flush=True) + ffmpeg_encoder_args = [] + if mime == "video/webm": + encode = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nokey=1:noprint_wrappers=1", video.name], capture_output=True, text=True).stdout.strip() + ffmpeg_encoder = None + if encode == "vp8": + ffmpeg_encoder = "libvpx" + elif encode == "vp9": + ffmpeg_encoder = "libvpx-vp9" + if ffmpeg_encoder: + ffmpeg_encoder_args = ["-c:v", ffmpeg_encoder] + result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", *ffmpeg_encoder_args, "-i", video.name, "-lossless", "1", webp.name], + capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}") + webp.seek(0) + return webp.read() + + +def video_to_gif(data: bytes, mime: str) -> bytes: + ext = mimetypes.guess_extension(mime) + if mime.startswith("video/"): + # run ffmpeg to fix duration + with tempfile.NamedTemporaryFile(suffix=ext) as temp: + temp.write(data) + temp.flush() + with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed: + print(".", end="", flush=True) + result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name], + capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}") + temp_fixed.seek(0) + data = temp_fixed.read() + data = video_to_webp(data) + with tempfile.NamedTemporaryFile(suffix=ext) as temp: + temp.write(data) + temp.flush() + with tempfile.NamedTemporaryFile(suffix=".gif") as gif: + print(".", end="", flush=True) + im = Image.open(temp.name) + im.info.pop('background', None) + im.save(gif.name, save_all=True, lossless=True, quality=100, method=6) + gif.seek(0) + return gif.read() + + +def opermize_gif(data: bytes) -> bytes: + with tempfile.NamedTemporaryFile() as gif: + gif.write(data) + gif.flush() + # use gifsicle to optimize gif + result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name], + capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}") + gif.seek(0) + return gif.read() + + +def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int): + image: Image.Image = Image.open(BytesIO(data)) new_file = BytesIO() - image.save(new_file, "png") - w, h = image.size + suffix = mimetypes.guess_extension(mimetype) + if suffix: + suffix = suffix[1:] + # Determine if the image is a GIF + is_animated = getattr(image, "is_animated", False) + if is_animated: + frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)] + # Save the new GIF + frames[0].save( + new_file, + format='GIF', + save_all=True, + append_images=frames[1:], + loop=image.info.get('loop', 0), # Default loop to 0 if not present + duration=image.info.get('duration', 100), # Set a default duration if not present + disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background) + ) + # Get the size of the first frame to determine resizing + w, h = frames[0].size + else: + image = image.convert("RGBA") + image.save(new_file, format=suffix) + w, h = image.size if w > 256 or h > 256: # Set the width and height to lower values so clients wouldn't show them as huge images if w > h: @@ -40,6 +152,55 @@ def convert_image(data: bytes) -> (bytes, int, int): return new_file.getvalue(), w, h +def _convert_sticker(data: bytes) -> (bytes, str, int, int): + mimetype = guess_mime(data) + if mimetype.startswith("video/"): + data = video_to_gif(data, mimetype) + print(".", end="", flush=True) + mimetype = "image/gif" + elif mimetype.startswith("application/gzip"): + print(".", end="", flush=True) + # unzip file + import gzip + with gzip.open(BytesIO(data), "rb") as f: + data = f.read() + mimetype = guess_mime(data) + suffix = mimetypes.guess_extension(mimetype) + with tempfile.NamedTemporaryFile(suffix=suffix) as temp: + temp.write(data) + with tempfile.NamedTemporaryFile(suffix=".gif") as gif: + # run lottie_convert.py input output + print(".", end="", flush=True) + import subprocess + cmd = ["lottie_convert.py", temp.name, gif.name] + result = subprocess.run(cmd, capture_output=True, text=True) + retcode = result.returncode + if retcode != 0: + raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}") + gif.seek(0) + data = gif.read() + mimetype = "image/gif" + rlt = _convert_image(data, mimetype) + data = rlt[0] + if mimetype == "image/gif": + print(".", end="", flush=True) + data = opermize_gif(data) + return data, mimetype, rlt[1], rlt[2] + + +def convert_sticker(data: bytes) -> (bytes, str, int, int): + try: + return _convert_sticker(data) + except Exception as e: + mimetype = guess_mime(data) + print(f"Error converting image, mimetype: {mimetype}") + ext = mimetypes.guess_extension(mimetype) + with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp: + temp.write(data) + print(f"Saved to {temp.name}") + raise e + + def add_to_index(name: str, output_dir: str) -> None: index_path = os.path.join(output_dir, "index.json") try: @@ -57,7 +218,7 @@ def add_to_index(name: str, output_dir: str) -> None: def make_sticker(mxc: str, width: int, height: int, size: int, - body: str = "") -> matrix.StickerInfo: + mimetype: str, body: str = "") -> matrix.StickerInfo: return { "body": body, "url": mxc, @@ -65,7 +226,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int, "w": width, "h": height, "size": size, - "mimetype": "image/png", + "mimetype": mimetype, # Element iOS compatibility hack "thumbnail_url": mxc, @@ -73,7 +234,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int, "w": width, "h": height, "size": size, - "mimetype": "image/png", + "mimetype": mimetype, }, }, "msgtype": "m.sticker", diff --git a/sticker/pack.py b/sticker/pack.py index f082370..4815bc5 100644 --- a/sticker/pack.py +++ b/sticker/pack.py @@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr } print(f".. using existing upload") else: - image_data, width, height = util.convert_image(image_data) + image_data, mimetype, width, height = util.convert_sticker(image_data) print(".", end="", flush=True) - mxc = await matrix.upload(image_data, "image/png", file) + mxc = await matrix.upload(image_data, mimetype, file) print(".", end="", flush=True) - sticker = util.make_sticker(mxc, width, height, len(image_data), name) + sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name) sticker["id"] = sticker_id print(" uploaded", flush=True) return sticker diff --git a/sticker/stickerimport.py b/sticker/stickerimport.py index 534f3c4..6d1e7be 100644 --- a/sticker/stickerimport.py +++ b/sticker/stickerimport.py @@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri print(f"Reuploading {document.id}", end="", flush=True) data = await client.download_media(document, file=bytes) print(".", end="", flush=True) - data, width, height = util.convert_image(data) + data, mimetype, width, height = util.convert_sticker(data) print(".", end="", flush=True) - mxc = await matrix.upload(data, "image/png", f"{document.id}.png") + mxc = await matrix.upload(data, mimetype, f"{document.id}.png") print(".", flush=True) - return util.make_sticker(mxc, width, height, len(data)) + return util.make_sticker(mxc, width, height, len(data), mimetype) def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: