Compare commits

...

5 Commits

Author SHA1 Message Date
xz-dev 78ce8aabea
perf: optimize gif via gifsicle 2024-07-17 07:14:10 +08:00
xz-dev fe2d21ebce
fix: fix webm duration via ffmpeg 2024-07-17 07:14:10 +08:00
xz-dev b468f54b39
fix: support animated sticker convert(RGBA) 2024-07-17 07:14:10 +08:00
xz-dev 74cff4e863
feat: support animated(lottie) sticker 2024-07-17 07:14:05 +08:00
xz-dev 7769679ac3
feat: support animated(webm) sticker 2024-07-17 07:13:45 +08:00
4 changed files with 154 additions and 15 deletions

View File

@ -4,3 +4,5 @@ pillow
telethon
cryptg
python-magic
moviepy
lottie[all]

View File

@ -16,18 +16,103 @@
from functools import partial
from io import BytesIO
import os.path
import subprocess
import json
import tempfile
import mimetypes
from PIL import Image
try:
import magic
except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None
from PIL import Image, ImageSequence
from . import matrix
open_utf8 = partial(open, encoding='UTF-8')
def convert_image(data: bytes) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
def guess_mime(data: bytes) -> str:
mime = None
if magic:
try:
return magic.Magic(mime=True).from_buffer(data)
except Exception:
pass
else:
with tempfile.NamedTemporaryFile() as temp:
temp.write(data)
temp.close()
mime, _ = mimetypes.guess_type(temp.name)
return mime or "image/png"
def video_to_gif(data: bytes, mime: str) -> bytes:
ext = mimetypes.guess_extension(mime)
if mime.startswith("video/"):
# run ffmpeg to fix duration
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
from moviepy.editor import VideoFileClip
clip = VideoFileClip(temp.name)
clip.write_gif(gif.name, logger=None)
gif.seek(0)
return gif.read()
def opermize_gif(data: bytes) -> bytes:
with tempfile.NamedTemporaryFile() as gif:
gif.write(data)
gif.flush()
# use gifsicle to optimize gif
result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
gif.seek(0)
return gif.read()
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data))
new_file = BytesIO()
image.save(new_file, "png")
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
# Determine if the image is a GIF
is_animated = getattr(image, "is_animated", False)
if is_animated:
frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
# Save the new GIF
frames[0].save(
new_file,
format='GIF',
save_all=True,
append_images=frames[1:],
loop=image.info.get('loop', 0), # Default loop to 0 if not present
duration=image.info.get('duration', 100), # Set a default duration if not present
transparency=image.info.get('transparency', 255), # Default to 255 if transparency is not present
disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
)
# Get the size of the first frame to determine resizing
w, h = frames[0].size
else:
image = image.convert("RGBA")
image.save(new_file, format=suffix)
w, h = image.size
if w > 256 or h > 256:
# Set the width and height to lower values so clients wouldn't show them as huge images
@ -40,6 +125,54 @@ def convert_image(data: bytes) -> (bytes, int, int):
return new_file.getvalue(), w, h
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data)
if mimetype.startswith("video/"):
data = video_to_gif(data, mimetype)
print(".", end="", flush=True)
mimetype = "image/gif"
elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True)
# unzip file
import gzip
with gzip.open(BytesIO(data), "rb") as f:
data = f.read()
mimetype = guess_mime(data)
suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
# run lottie_convert.py input output
print(".", end="", flush=True)
import subprocess
cmd = ["lottie_convert.py", temp.name, gif.name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0)
data = gif.read()
mimetype = "image/gif"
rlt = _convert_image(data, mimetype)
data = rlt[0]
if mimetype == "image/gif":
print(".", end="", flush=True)
data = opermize_gif(data)
return data, mimetype, rlt[1], rlt[2]
def convert_sticker(data: bytes) -> (bytes, str, int, int):
try:
return _convert_sticker(data)
except Exception as e:
mimetype = guess_mime(data)
print(f"Error converting image, mimetype: {mimetype}")
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
temp.write(data)
print(f"Saved to {temp.name}")
raise e
def add_to_index(name: str, output_dir: str) -> None:
index_path = os.path.join(output_dir, "index.json")
try:
@ -57,7 +190,7 @@ def add_to_index(name: str, output_dir: str) -> None:
def make_sticker(mxc: str, width: int, height: int, size: int,
body: str = "") -> matrix.StickerInfo:
mimetype: str, body: str = "") -> matrix.StickerInfo:
return {
"body": body,
"url": mxc,
@ -65,7 +198,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width,
"h": height,
"size": size,
"mimetype": "image/png",
"mimetype": mimetype,
# Element iOS compatibility hack
"thumbnail_url": mxc,
@ -73,7 +206,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width,
"h": height,
"size": size,
"mimetype": "image/png",
"mimetype": mimetype,
},
},
"msgtype": "m.sticker",

View File

@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
}
print(f".. using existing upload")
else:
image_data, width, height = util.convert_image(image_data)
image_data, mimetype, width, height = util.convert_sticker(image_data)
print(".", end="", flush=True)
mxc = await matrix.upload(image_data, "image/png", file)
mxc = await matrix.upload(image_data, mimetype, file)
print(".", end="", flush=True)
sticker = util.make_sticker(mxc, width, height, len(image_data), name)
sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
sticker["id"] = sticker_id
print(" uploaded", flush=True)
return sticker

View File

@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes)
print(".", end="", flush=True)
data, width, height = util.convert_image(data)
data, mimetype, width, height = util.convert_sticker(data)
print(".", end="", flush=True)
mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
print(".", flush=True)
return util.make_sticker(mxc, width, height, len(data))
return util.make_sticker(mxc, width, height, len(data), mimetype)
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
@ -81,7 +81,11 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
reuploaded_documents[document.id] = already_uploaded[document.id]
print(f"Skipped reuploading {document.id}")
except KeyError:
try:
reuploaded_documents[document.id] = await reupload_document(client, document)
except Exception as e:
print(f"Failed to reupload {document.id}: {e}")
continue
# Always ensure the body and telegram metadata is correct
add_meta(document, reuploaded_documents[document.id], pack)