From 7769679ac341498347fa568883ac20680482aefd Mon Sep 17 00:00:00 2001 From: xz-dev Date: Tue, 16 Jul 2024 16:07:54 +0800 Subject: [PATCH] feat: support animated(webm) sticker --- requirements.txt | 1 + sticker/lib/util.py | 62 +++++++++++++++++++++++++++++++++++++--- sticker/pack.py | 6 ++-- sticker/stickerimport.py | 12 +++++--- 4 files changed, 70 insertions(+), 11 deletions(-) diff --git a/requirements.txt b/requirements.txt index 6d89dbf..bd4cfb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ pillow telethon cryptg python-magic +moviepy diff --git a/sticker/lib/util.py b/sticker/lib/util.py index 2b3fe2a..43e01d3 100644 --- a/sticker/lib/util.py +++ b/sticker/lib/util.py @@ -17,14 +17,50 @@ from functools import partial from io import BytesIO import os.path import json +import tempfile +import mimetypes +try: + import magic +except ImportError: + print("[Warning] Magic is not installed, using file extensions to guess mime types") + magic = None from PIL import Image from . import matrix open_utf8 = partial(open, encoding='UTF-8') -def convert_image(data: bytes) -> (bytes, int, int): + +def guess_mime(data: bytes) -> str: + mime = None + if magic: + try: + return magic.Magic(mime=True).from_buffer(data) + except Exception: + pass + else: + with tempfile.NamedTemporaryFile(delete=False) as temp: + temp.write(data) + temp.close() + mime, _ = mimetypes.guess_type(temp.name) + return mime or "image/png" + + +def video_to_gif(data: bytes, mime: str) -> bytes: + from moviepy.editor import VideoFileClip + ext = mimetypes.guess_extension(mime) + with tempfile.NamedTemporaryFile(suffix=ext) as temp: + temp.write(data) + temp.flush() + with tempfile.NamedTemporaryFile(suffix=".gif") as gif: + clip = VideoFileClip(temp.name) + clip.write_gif(gif.name, logger=None) + gif.seek(0) + return gif.read() + + +def _convert_image(data: bytes) -> (bytes, int, int): image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") new_file = BytesIO() image.save(new_file, "png") @@ -40,6 +76,24 @@ def convert_image(data: bytes) -> (bytes, int, int): return new_file.getvalue(), w, h +def convert_image(data: bytes) -> (bytes, str, int, int): + mimetype = guess_mime(data) + if mimetype.startswith("video/"): + data = video_to_gif(data, mimetype) + print(".", end="", flush=True) + mimetype = "image/gif" + try: + rlt = _convert_image(data) + return rlt[0], mimetype, rlt[1], rlt[2] + except Exception as e: + print(f"Error converting image, mimetype: {mimetype}") + ext = mimetypes.guess_extension(mimetype) + with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp: + temp.write(data) + print(f"Saved to {temp.name}") + raise e + + def add_to_index(name: str, output_dir: str) -> None: index_path = os.path.join(output_dir, "index.json") try: @@ -57,7 +111,7 @@ def add_to_index(name: str, output_dir: str) -> None: def make_sticker(mxc: str, width: int, height: int, size: int, - body: str = "") -> matrix.StickerInfo: + mimetype: str, body: str = "") -> matrix.StickerInfo: return { "body": body, "url": mxc, @@ -65,7 +119,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int, "w": width, "h": height, "size": size, - "mimetype": "image/png", + "mimetype": mimetype, # Element iOS compatibility hack "thumbnail_url": mxc, @@ -73,7 +127,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int, "w": width, "h": height, "size": size, - "mimetype": "image/png", + "mimetype": mimetype, }, }, "msgtype": "m.sticker", diff --git a/sticker/pack.py b/sticker/pack.py index f082370..6b1a646 100644 --- a/sticker/pack.py +++ b/sticker/pack.py @@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr } print(f".. using existing upload") else: - image_data, width, height = util.convert_image(image_data) + image_data, mimetype, width, height = util.convert_image(image_data) print(".", end="", flush=True) - mxc = await matrix.upload(image_data, "image/png", file) + mxc = await matrix.upload(image_data, mimetype, file) print(".", end="", flush=True) - sticker = util.make_sticker(mxc, width, height, len(image_data), name) + sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name) sticker["id"] = sticker_id print(" uploaded", flush=True) return sticker diff --git a/sticker/stickerimport.py b/sticker/stickerimport.py index 534f3c4..eabea4c 100644 --- a/sticker/stickerimport.py +++ b/sticker/stickerimport.py @@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri print(f"Reuploading {document.id}", end="", flush=True) data = await client.download_media(document, file=bytes) print(".", end="", flush=True) - data, width, height = util.convert_image(data) + data, mimetype, width, height = util.convert_image(data) print(".", end="", flush=True) - mxc = await matrix.upload(data, "image/png", f"{document.id}.png") + mxc = await matrix.upload(data, mimetype, f"{document.id}.png") print(".", flush=True) - return util.make_sticker(mxc, width, height, len(data)) + return util.make_sticker(mxc, width, height, len(data), mimetype) def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: @@ -81,7 +81,11 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir reuploaded_documents[document.id] = already_uploaded[document.id] print(f"Skipped reuploading {document.id}") except KeyError: - reuploaded_documents[document.id] = await reupload_document(client, document) + try: + reuploaded_documents[document.id] = await reupload_document(client, document) + except Exception as e: + print(f"Failed to reupload {document.id}: {e}") + continue # Always ensure the body and telegram metadata is correct add_meta(document, reuploaded_documents[document.id], pack)