feat: support animated(webm) sticker
This commit is contained in:
parent
333567f481
commit
7769679ac3
|
@ -4,3 +4,4 @@ pillow
|
||||||
telethon
|
telethon
|
||||||
cryptg
|
cryptg
|
||||||
python-magic
|
python-magic
|
||||||
|
moviepy
|
||||||
|
|
|
@ -17,14 +17,50 @@ from functools import partial
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import os.path
|
import os.path
|
||||||
import json
|
import json
|
||||||
|
import tempfile
|
||||||
|
import mimetypes
|
||||||
|
|
||||||
|
try:
|
||||||
|
import magic
|
||||||
|
except ImportError:
|
||||||
|
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
||||||
|
magic = None
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
from . import matrix
|
from . import matrix
|
||||||
|
|
||||||
open_utf8 = partial(open, encoding='UTF-8')
|
open_utf8 = partial(open, encoding='UTF-8')
|
||||||
|
|
||||||
def convert_image(data: bytes) -> (bytes, int, int):
|
|
||||||
|
def guess_mime(data: bytes) -> str:
|
||||||
|
mime = None
|
||||||
|
if magic:
|
||||||
|
try:
|
||||||
|
return magic.Magic(mime=True).from_buffer(data)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False) as temp:
|
||||||
|
temp.write(data)
|
||||||
|
temp.close()
|
||||||
|
mime, _ = mimetypes.guess_type(temp.name)
|
||||||
|
return mime or "image/png"
|
||||||
|
|
||||||
|
|
||||||
|
def video_to_gif(data: bytes, mime: str) -> bytes:
|
||||||
|
from moviepy.editor import VideoFileClip
|
||||||
|
ext = mimetypes.guess_extension(mime)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||||
|
temp.write(data)
|
||||||
|
temp.flush()
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||||
|
clip = VideoFileClip(temp.name)
|
||||||
|
clip.write_gif(gif.name, logger=None)
|
||||||
|
gif.seek(0)
|
||||||
|
return gif.read()
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_image(data: bytes) -> (bytes, int, int):
|
||||||
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
|
||||||
new_file = BytesIO()
|
new_file = BytesIO()
|
||||||
image.save(new_file, "png")
|
image.save(new_file, "png")
|
||||||
|
@ -40,6 +76,24 @@ def convert_image(data: bytes) -> (bytes, int, int):
|
||||||
return new_file.getvalue(), w, h
|
return new_file.getvalue(), w, h
|
||||||
|
|
||||||
|
|
||||||
|
def convert_image(data: bytes) -> (bytes, str, int, int):
|
||||||
|
mimetype = guess_mime(data)
|
||||||
|
if mimetype.startswith("video/"):
|
||||||
|
data = video_to_gif(data, mimetype)
|
||||||
|
print(".", end="", flush=True)
|
||||||
|
mimetype = "image/gif"
|
||||||
|
try:
|
||||||
|
rlt = _convert_image(data)
|
||||||
|
return rlt[0], mimetype, rlt[1], rlt[2]
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error converting image, mimetype: {mimetype}")
|
||||||
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
|
||||||
|
temp.write(data)
|
||||||
|
print(f"Saved to {temp.name}")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
def add_to_index(name: str, output_dir: str) -> None:
|
def add_to_index(name: str, output_dir: str) -> None:
|
||||||
index_path = os.path.join(output_dir, "index.json")
|
index_path = os.path.join(output_dir, "index.json")
|
||||||
try:
|
try:
|
||||||
|
@ -57,7 +111,7 @@ def add_to_index(name: str, output_dir: str) -> None:
|
||||||
|
|
||||||
|
|
||||||
def make_sticker(mxc: str, width: int, height: int, size: int,
|
def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||||
body: str = "") -> matrix.StickerInfo:
|
mimetype: str, body: str = "") -> matrix.StickerInfo:
|
||||||
return {
|
return {
|
||||||
"body": body,
|
"body": body,
|
||||||
"url": mxc,
|
"url": mxc,
|
||||||
|
@ -65,7 +119,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||||
"w": width,
|
"w": width,
|
||||||
"h": height,
|
"h": height,
|
||||||
"size": size,
|
"size": size,
|
||||||
"mimetype": "image/png",
|
"mimetype": mimetype,
|
||||||
|
|
||||||
# Element iOS compatibility hack
|
# Element iOS compatibility hack
|
||||||
"thumbnail_url": mxc,
|
"thumbnail_url": mxc,
|
||||||
|
@ -73,7 +127,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
|
||||||
"w": width,
|
"w": width,
|
||||||
"h": height,
|
"h": height,
|
||||||
"size": size,
|
"size": size,
|
||||||
"mimetype": "image/png",
|
"mimetype": mimetype,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"msgtype": "m.sticker",
|
"msgtype": "m.sticker",
|
||||||
|
|
|
@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
|
||||||
}
|
}
|
||||||
print(f".. using existing upload")
|
print(f".. using existing upload")
|
||||||
else:
|
else:
|
||||||
image_data, width, height = util.convert_image(image_data)
|
image_data, mimetype, width, height = util.convert_image(image_data)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mxc = await matrix.upload(image_data, "image/png", file)
|
mxc = await matrix.upload(image_data, mimetype, file)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
sticker = util.make_sticker(mxc, width, height, len(image_data), name)
|
sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
|
||||||
sticker["id"] = sticker_id
|
sticker["id"] = sticker_id
|
||||||
print(" uploaded", flush=True)
|
print(" uploaded", flush=True)
|
||||||
return sticker
|
return sticker
|
||||||
|
|
|
@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
|
||||||
print(f"Reuploading {document.id}", end="", flush=True)
|
print(f"Reuploading {document.id}", end="", flush=True)
|
||||||
data = await client.download_media(document, file=bytes)
|
data = await client.download_media(document, file=bytes)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
data, width, height = util.convert_image(data)
|
data, mimetype, width, height = util.convert_image(data)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
|
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
|
||||||
print(".", flush=True)
|
print(".", flush=True)
|
||||||
return util.make_sticker(mxc, width, height, len(data))
|
return util.make_sticker(mxc, width, height, len(data), mimetype)
|
||||||
|
|
||||||
|
|
||||||
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
|
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
|
||||||
|
@ -81,7 +81,11 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
|
||||||
reuploaded_documents[document.id] = already_uploaded[document.id]
|
reuploaded_documents[document.id] = already_uploaded[document.id]
|
||||||
print(f"Skipped reuploading {document.id}")
|
print(f"Skipped reuploading {document.id}")
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
try:
|
||||||
reuploaded_documents[document.id] = await reupload_document(client, document)
|
reuploaded_documents[document.id] = await reupload_document(client, document)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to reupload {document.id}: {e}")
|
||||||
|
continue
|
||||||
# Always ensure the body and telegram metadata is correct
|
# Always ensure the body and telegram metadata is correct
|
||||||
add_meta(document, reuploaded_documents[document.id], pack)
|
add_meta(document, reuploaded_documents[document.id], pack)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue