Compare commits

...

5 Commits

Author SHA1 Message Date
xz-dev 5742ac56ce
perf: optimize gif via gifsicle 2024-07-16 22:13:15 +08:00
xz-dev 0c2dedd7d4
fix: fix webm duration via ffmpeg 2024-07-16 21:17:07 +08:00
xz-dev 1b866cf4c9
fix: support animated sticker convert(RGBA) 2024-07-16 18:58:10 +08:00
xz-dev b63d16d41f
feat: support animated(lottie) sticker 2024-07-16 18:16:38 +08:00
xz-dev 788ff1e568
feat: support animated(webm) sticker 2024-07-16 16:07:54 +08:00
4 changed files with 153 additions and 15 deletions

View File

@ -4,3 +4,4 @@ pillow
telethon telethon
cryptg cryptg
python-magic python-magic
lottie[all]

View File

@ -16,19 +16,104 @@
from functools import partial from functools import partial
from io import BytesIO from io import BytesIO
import os.path import os.path
import subprocess
import json import json
import tempfile
import mimetypes
from PIL import Image try:
import magic
except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None
from PIL import Image, ImageSequence
from . import matrix from . import matrix
open_utf8 = partial(open, encoding='UTF-8') open_utf8 = partial(open, encoding='UTF-8')
def convert_image(data: bytes) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA") def guess_mime(data: bytes) -> str:
mime = None
if magic:
try:
return magic.Magic(mime=True).from_buffer(data)
except Exception:
pass
else:
with tempfile.NamedTemporaryFile() as temp:
temp.write(data)
temp.close()
mime, _ = mimetypes.guess_type(temp.name)
return mime or "image/png"
def video_to_gif(data: bytes, mime: str) -> bytes:
ext = mimetypes.guess_extension(mime)
if mime.startswith("video/"):
# run ffmpeg to fix duration
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
from moviepy.editor import VideoFileClip
clip = VideoFileClip(temp.name)
clip.write_gif(gif.name, logger=None)
gif.seek(0)
return gif.read()
def opermize_gif(data: bytes) -> bytes:
with tempfile.NamedTemporaryFile() as gif:
gif.write(data)
gif.flush()
# use gifsicle to optimize gif
result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
gif.seek(0)
return gif.read()
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data))
new_file = BytesIO() new_file = BytesIO()
image.save(new_file, "png") suffix = mimetypes.guess_extension(mimetype)
w, h = image.size if suffix:
suffix = suffix[1:]
# Determine if the image is a GIF
is_animated = getattr(image, "is_animated", False)
if is_animated:
frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
# Save the new GIF
frames[0].save(
new_file,
format='GIF',
save_all=True,
append_images=frames[1:],
loop=image.info.get('loop', 0), # Default loop to 0 if not present
duration=image.info.get('duration', 100), # Set a default duration if not present
transparency=image.info.get('transparency', 255), # Default to 255 if transparency is not present
disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
)
# Get the size of the first frame to determine resizing
w, h = frames[0].size
else:
image = image.convert("RGBA")
image.save(new_file, format=suffix)
w, h = image.size
if w > 256 or h > 256: if w > 256 or h > 256:
# Set the width and height to lower values so clients wouldn't show them as huge images # Set the width and height to lower values so clients wouldn't show them as huge images
if w > h: if w > h:
@ -40,6 +125,54 @@ def convert_image(data: bytes) -> (bytes, int, int):
return new_file.getvalue(), w, h return new_file.getvalue(), w, h
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data)
if mimetype.startswith("video/"):
data = video_to_gif(data, mimetype)
print(".", end="", flush=True)
mimetype = "image/gif"
elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True)
# unzip file
import gzip
with gzip.open(BytesIO(data), "rb") as f:
data = f.read()
mimetype = guess_mime(data)
suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
# run lottie_convert.py input output
print(".", end="", flush=True)
import subprocess
cmd = ["lottie_convert.py", temp.name, gif.name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0)
data = gif.read()
mimetype = "image/gif"
rlt = _convert_image(data, mimetype)
data = rlt[0]
if mimetype == "image/gif":
print(".", end="", flush=True)
data = opermize_gif(data)
return data, mimetype, rlt[1], rlt[2]
def convert_sticker(data: bytes) -> (bytes, str, int, int):
try:
return _convert_sticker(data)
except Exception as e:
mimetype = guess_mime(data)
print(f"Error converting image, mimetype: {mimetype}")
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
temp.write(data)
print(f"Saved to {temp.name}")
raise e
def add_to_index(name: str, output_dir: str) -> None: def add_to_index(name: str, output_dir: str) -> None:
index_path = os.path.join(output_dir, "index.json") index_path = os.path.join(output_dir, "index.json")
try: try:
@ -57,7 +190,7 @@ def add_to_index(name: str, output_dir: str) -> None:
def make_sticker(mxc: str, width: int, height: int, size: int, def make_sticker(mxc: str, width: int, height: int, size: int,
body: str = "") -> matrix.StickerInfo: mimetype: str, body: str = "") -> matrix.StickerInfo:
return { return {
"body": body, "body": body,
"url": mxc, "url": mxc,
@ -65,7 +198,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width, "w": width,
"h": height, "h": height,
"size": size, "size": size,
"mimetype": "image/png", "mimetype": mimetype,
# Element iOS compatibility hack # Element iOS compatibility hack
"thumbnail_url": mxc, "thumbnail_url": mxc,
@ -73,7 +206,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
"w": width, "w": width,
"h": height, "h": height,
"size": size, "size": size,
"mimetype": "image/png", "mimetype": mimetype,
}, },
}, },
"msgtype": "m.sticker", "msgtype": "m.sticker",

View File

@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
} }
print(f".. using existing upload") print(f".. using existing upload")
else: else:
image_data, width, height = util.convert_image(image_data) image_data, mimetype, width, height = util.convert_sticker(image_data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(image_data, "image/png", file) mxc = await matrix.upload(image_data, mimetype, file)
print(".", end="", flush=True) print(".", end="", flush=True)
sticker = util.make_sticker(mxc, width, height, len(image_data), name) sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
sticker["id"] = sticker_id sticker["id"] = sticker_id
print(" uploaded", flush=True) print(" uploaded", flush=True)
return sticker return sticker

View File

@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(f"Reuploading {document.id}", end="", flush=True) print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes) data = await client.download_media(document, file=bytes)
print(".", end="", flush=True) print(".", end="", flush=True)
data, width, height = util.convert_image(data) data, mimetype, width, height = util.convert_sticker(data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(data, "image/png", f"{document.id}.png") mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
print(".", flush=True) print(".", flush=True)
return util.make_sticker(mxc, width, height, len(data)) return util.make_sticker(mxc, width, height, len(data), mimetype)
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None: def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
@ -81,7 +81,11 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
reuploaded_documents[document.id] = already_uploaded[document.id] reuploaded_documents[document.id] = already_uploaded[document.id]
print(f"Skipped reuploading {document.id}") print(f"Skipped reuploading {document.id}")
except KeyError: except KeyError:
reuploaded_documents[document.id] = await reupload_document(client, document) try:
reuploaded_documents[document.id] = await reupload_document(client, document)
except Exception as e:
print(f"Failed to reupload {document.id}: {e}")
continue
# Always ensure the body and telegram metadata is correct # Always ensure the body and telegram metadata is correct
add_meta(document, reuploaded_documents[document.id], pack) add_meta(document, reuploaded_documents[document.id], pack)