feat: support animated(lottie) sticker
This commit is contained in:
parent
7769679ac3
commit
74cff4e863
|
@ -5,3 +5,4 @@ telethon
|
||||||
cryptg
|
cryptg
|
||||||
python-magic
|
python-magic
|
||||||
moviepy
|
moviepy
|
||||||
|
lottie[all]
|
||||||
|
|
|
@ -76,16 +76,42 @@ def _convert_image(data: bytes) -> (bytes, int, int):
|
||||||
return new_file.getvalue(), w, h
|
return new_file.getvalue(), w, h
|
||||||
|
|
||||||
|
|
||||||
def convert_image(data: bytes) -> (bytes, str, int, int):
|
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
mimetype = guess_mime(data)
|
mimetype = guess_mime(data)
|
||||||
if mimetype.startswith("video/"):
|
if mimetype.startswith("video/"):
|
||||||
data = video_to_gif(data, mimetype)
|
data = video_to_gif(data, mimetype)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mimetype = "image/gif"
|
mimetype = "image/gif"
|
||||||
try:
|
elif mimetype.startswith("application/gzip"):
|
||||||
|
print(".", end="", flush=True)
|
||||||
|
# unzip file
|
||||||
|
import gzip
|
||||||
|
with gzip.open(BytesIO(data), "rb") as f:
|
||||||
|
data = f.read()
|
||||||
|
mimetype = guess_mime(data)
|
||||||
|
suffix = mimetypes.guess_extension(mimetype)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
||||||
|
temp.write(data)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||||
|
# run lottie_convert.py input output
|
||||||
|
print(".", end="", flush=True)
|
||||||
|
import subprocess
|
||||||
|
cmd = ["lottie_convert.py", temp.name, gif.name]
|
||||||
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
||||||
|
gif.seek(0)
|
||||||
|
data = gif.read()
|
||||||
|
mimetype = "image/gif"
|
||||||
rlt = _convert_image(data)
|
rlt = _convert_image(data)
|
||||||
return rlt[0], mimetype, rlt[1], rlt[2]
|
return rlt[0], mimetype, rlt[1], rlt[2]
|
||||||
|
|
||||||
|
|
||||||
|
def convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
|
try:
|
||||||
|
return _convert_sticker(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
mimetype = guess_mime(data)
|
||||||
print(f"Error converting image, mimetype: {mimetype}")
|
print(f"Error converting image, mimetype: {mimetype}")
|
||||||
ext = mimetypes.guess_extension(mimetype)
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
|
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
|
||||||
|
|
|
@ -77,7 +77,7 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
|
||||||
}
|
}
|
||||||
print(f".. using existing upload")
|
print(f".. using existing upload")
|
||||||
else:
|
else:
|
||||||
image_data, mimetype, width, height = util.convert_image(image_data)
|
image_data, mimetype, width, height = util.convert_sticker(image_data)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mxc = await matrix.upload(image_data, mimetype, file)
|
mxc = await matrix.upload(image_data, mimetype, file)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
|
|
|
@ -33,7 +33,7 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
|
||||||
print(f"Reuploading {document.id}", end="", flush=True)
|
print(f"Reuploading {document.id}", end="", flush=True)
|
||||||
data = await client.download_media(document, file=bytes)
|
data = await client.download_media(document, file=bytes)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
data, mimetype, width, height = util.convert_image(data)
|
data, mimetype, width, height = util.convert_sticker(data)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
|
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
|
||||||
print(".", flush=True)
|
print(".", flush=True)
|
||||||
|
|
Loading…
Reference in New Issue