feat: support animated(webm) sticker

This commit is contained in:
xz-dev 2024-07-16 16:07:54 +08:00
parent 333567f481
commit 715f62af58
No known key found for this signature in database
GPG Key ID: A20912F811313E36
4 changed files with 65 additions and 10 deletions

View File

@ -4,3 +4,4 @@ pillow
telethon telethon
cryptg cryptg
python-magic python-magic
moviepy

View File

@ -17,14 +17,50 @@ from functools import partial
from io import BytesIO from io import BytesIO
import os.path import os.path
import json import json
import tempfile
import mimetypes
try:
import magic
except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None
from PIL import Image from PIL import Image
from . import matrix from . import matrix
open_utf8 = partial(open, encoding='UTF-8') open_utf8 = partial(open, encoding='UTF-8')
def convert_image(data: bytes) -> (bytes, int, int):
def guess_mime(data: bytes) -> str:
mime = None
if magic:
try:
return magic.Magic(mime=True).from_buffer(data)
except Exception:
pass
else:
with tempfile.NamedTemporaryFile(delete=False) as temp:
temp.write(data)
temp.close()
mime, _ = mimetypes.guess_type(temp.name)
return mime or "image/png"
def video_to_gif(data: bytes, mime: str) -> bytes:
from moviepy.editor import VideoFileClip
ext = mimetypes.guess_extension(mime)
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
clip = VideoFileClip(temp.name)
clip.write_gif(gif.name, logger=None)
gif.seek(0)
return gif.read()
def _convert_image(data: bytes) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
new_file = BytesIO() new_file = BytesIO()
image.save(new_file, "png") image.save(new_file, "png")
@ -40,6 +76,24 @@ def convert_image(data: bytes) -> (bytes, int, int):
return new_file.getvalue(), w, h return new_file.getvalue(), w, h
def convert_image(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data)
if mimetype.startswith("video/"):
data = video_to_gif(data, mimetype)
print(".", end="", flush=True)
mimetype = "image/gif"
try:
rlt = _convert_image(data)
return rlt[0], mimetype, rlt[1], rlt[2]
except Exception as e:
print(f"Error converting image, mimetype: {mimetype}")
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
temp.write(data)
print(f"Saved to {temp.name}")
raise e
def add_to_index(name: str, output_dir: str) -> None: def add_to_index(name: str, output_dir: str) -> None:
index_path = os.path.join(output_dir, "index.json") index_path = os.path.join(output_dir, "index.json")
try: try:
@ -57,7 +111,7 @@ def add_to_index(name: str, output_dir: str) -> None:
def make_sticker(mxc: str, width: int, height: int, size: int, def make_sticker(mxc: str, width: int, height: int, size: int,
body: str = "") -> matrix.StickerInfo: mimetype: str, body: str = "") -> matrix.StickerInfo:
return { return {
"body": body, "body": body,
"url": mxc, "url": mxc,
@ -65,7 +119,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width, "w": width,
"h": height, "h": height,
"size": size, "size": size,
"mimetype": "image/png", "mimetype": mimetype,
# Element iOS compatibility hack # Element iOS compatibility hack
"thumbnail_url": mxc, "thumbnail_url": mxc,
@ -73,7 +127,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width, "w": width,
"h": height, "h": height,
"size": size, "size": size,
"mimetype": "image/png", "mimetype": mimetype,
}, },
}, },
"msgtype": "m.sticker", "msgtype": "m.sticker",

View File

@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
} }
print(f".. using existing upload") print(f".. using existing upload")
else: else:
image_data, width, height = util.convert_image(image_data) image_data, mimetype, width, height = util.convert_image(image_data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(image_data, "image/png", file) mxc = await matrix.upload(image_data, mimetype, file)
print(".", end="", flush=True) print(".", end="", flush=True)
sticker = util.make_sticker(mxc, width, height, len(image_data), name) sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
sticker["id"] = sticker_id sticker["id"] = sticker_id
print(" uploaded", flush=True) print(" uploaded", flush=True)
return sticker return sticker

View File

@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(f"Reuploading {document.id}", end="", flush=True) print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes) data = await client.download_media(document, file=bytes)
print(".", end="", flush=True) print(".", end="", flush=True)
data, width, height = util.convert_image(data) data, mimetype, width, height = util.convert_image(data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(data, "image/png", f"{document.id}.png") mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
print(".", flush=True) print(".", flush=True)
return util.make_sticker(mxc, width, height, len(data)) return util.make_sticker(mxc, width, height, len(data), mimetype)
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: