Compare commits

...

2 Commits

Author SHA1 Message Date
xz-dev e5905c56dd
Merge a83bc15208 into 333567f481 2024-09-14 23:04:03 +08:00
xz-dev a83bc15208
fix: keep webm transparency 2024-09-14 23:03:26 +08:00
2 changed files with 35 additions and 8 deletions

View File

@ -4,5 +4,4 @@ pillow
telethon telethon
cryptg cryptg
python-magic python-magic
moviepy
lottie[all] lottie[all]

View File

@ -48,6 +48,32 @@ def guess_mime(data: bytes) -> str:
return mime or "image/png" return mime or "image/png"
def video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data)
ext = mimetypes.guess_extension(mime)
with tempfile.NamedTemporaryFile(suffix=ext) as video:
video.write(data)
video.flush()
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
print(".", end="", flush=True)
ffmpeg_encoder_args = []
if mime == "video/webm":
encode = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nokey=1:noprint_wrappers=1", video.name], capture_output=True, text=True).stdout.strip()
ffmpeg_encoder = None
if encode == "vp8":
ffmpeg_encoder = "libvpx"
elif encode == "vp9":
ffmpeg_encoder = "libvpx-vp9"
if ffmpeg_encoder:
ffmpeg_encoder_args = ["-c:v", ffmpeg_encoder]
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", *ffmpeg_encoder_args, "-i", video.name, "-lossless", "1", webp.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
webp.seek(0)
return webp.read()
def video_to_gif(data: bytes, mime: str) -> bytes: def video_to_gif(data: bytes, mime: str) -> bytes:
ext = mimetypes.guess_extension(mime) ext = mimetypes.guess_extension(mime)
if mime.startswith("video/"): if mime.startswith("video/"):
@ -57,19 +83,21 @@ def video_to_gif(data: bytes, mime: str) -> bytes:
temp.flush() temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed: with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True) print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-i", temp.name, "-codec", "copy", temp_fixed.name], result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True) capture_output=True)
if result.returncode != 0: if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}") raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0) temp_fixed.seek(0)
data = temp_fixed.read() data = temp_fixed.read()
data = video_to_webp(data)
with tempfile.NamedTemporaryFile(suffix=ext) as temp: with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data) temp.write(data)
temp.flush() temp.flush()
with tempfile.NamedTemporaryFile(suffix=".gif") as gif: with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
from moviepy.editor import VideoFileClip print(".", end="", flush=True)
clip = VideoFileClip(temp.name) im = Image.open(temp.name)
clip.write_gif(gif.name, logger=None) im.info.pop('background', None)
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
gif.seek(0) gif.seek(0)
return gif.read() return gif.read()
@ -105,7 +133,6 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
append_images=frames[1:], append_images=frames[1:],
loop=image.info.get('loop', 0), # Default loop to 0 if not present loop=image.info.get('loop', 0), # Default loop to 0 if not present
duration=image.info.get('duration', 100), # Set a default duration if not present duration=image.info.get('duration', 100), # Set a default duration if not present
transparency=image.info.get('transparency', 255), # Default to 255 if transparency is not present
disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background) disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
) )
# Get the size of the first frame to determine resizing # Get the size of the first frame to determine resizing
@ -147,7 +174,8 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
import subprocess import subprocess
cmd = ["lottie_convert.py", temp.name, gif.name] cmd = ["lottie_convert.py", temp.name, gif.name]
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0: retcode = result.returncode
if retcode != 0:
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}") raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0) gif.seek(0)
data = gif.read() data = gif.read()
@ -157,7 +185,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
if mimetype == "image/gif": if mimetype == "image/gif":
print(".", end="", flush=True) print(".", end="", flush=True)
data = opermize_gif(data) data = opermize_gif(data)
return data, mimetype, rlt[1], rlt[2] return data, mimetype, rlt[1], rlt[2]
def convert_sticker(data: bytes) -> (bytes, str, int, int): def convert_sticker(data: bytes) -> (bytes, str, int, int):