feat: support animated(lottie) sticker

This commit is contained in:
xz-dev 2024-07-16 18:15:49 +08:00
parent 715f62af58
commit be477874e3
No known key found for this signature in database
GPG Key ID: A20912F811313E36
4 changed files with 32 additions and 5 deletions

View File

@ -5,3 +5,4 @@ telethon
cryptg cryptg
python-magic python-magic
moviepy moviepy
lottie[all]

View File

@ -76,16 +76,42 @@ def _convert_image(data: bytes) -> (bytes, int, int):
return new_file.getvalue(), w, h return new_file.getvalue(), w, h
def convert_image(data: bytes) -> (bytes, str, int, int): def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data) mimetype = guess_mime(data)
if mimetype.startswith("video/"): if mimetype.startswith("video/"):
data = video_to_gif(data, mimetype) data = video_to_gif(data, mimetype)
print(".", end="", flush=True) print(".", end="", flush=True)
mimetype = "image/gif" mimetype = "image/gif"
elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True)
# unzip file
import gzip
with gzip.open(BytesIO(data), "rb") as f:
data = f.read()
mimetype = guess_mime(data)
suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
# run lottie_convert.py input output
print(".", end="", flush=True)
import subprocess
cmd = ["lottie_convert.py", temp.name, gif.name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0)
data = gif.read()
mimetype = "image/gif"
rlt = _convert_image(data)
return rlt[0], mimetype, rlt[1], rlt[2]
def convert_sticker(data: bytes) -> (bytes, str, int, int):
try: try:
rlt = _convert_image(data) return _convert_sticker(data)
return rlt[0], mimetype, rlt[1], rlt[2]
except Exception as e: except Exception as e:
mimetype = guess_mime(data)
print(f"Error converting image, mimetype: {mimetype}") print(f"Error converting image, mimetype: {mimetype}")
ext = mimetypes.guess_extension(mimetype) ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp: with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:

View File

@ -77,7 +77,7 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
} }
print(f".. using existing upload") print(f".. using existing upload")
else: else:
image_data, mimetype, width, height = util.convert_image(image_data) image_data, mimetype, width, height = util.convert_sticker(image_data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(image_data, mimetype, file) mxc = await matrix.upload(image_data, mimetype, file)
print(".", end="", flush=True) print(".", end="", flush=True)

View File

@ -33,7 +33,7 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(f"Reuploading {document.id}", end="", flush=True) print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes) data = await client.download_media(document, file=bytes)
print(".", end="", flush=True) print(".", end="", flush=True)
data, mimetype, width, height = util.convert_image(data) data, mimetype, width, height = util.convert_sticker(data)
print(".", end="", flush=True) print(".", end="", flush=True)
mxc = await matrix.upload(data, mimetype, f"{document.id}.png") mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
print(".", flush=True) print(".", flush=True)