Compare commits
2 Commits
a83bc15208
...
86cb2edcfa
Author | SHA1 | Date |
---|---|---|
xz-dev | 86cb2edcfa | |
xz-dev | a0ef9f84be |
|
@ -15,6 +15,7 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from functools import partial
|
||||
from io import BytesIO
|
||||
import numpy as np
|
||||
import os.path
|
||||
import subprocess
|
||||
import json
|
||||
|
@ -26,7 +27,7 @@ try:
|
|||
except ImportError:
|
||||
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
||||
magic = None
|
||||
from PIL import Image, ImageSequence
|
||||
from PIL import Image, ImageSequence, ImageFilter
|
||||
|
||||
from . import matrix
|
||||
|
||||
|
@ -48,7 +49,7 @@ def guess_mime(data: bytes) -> str:
|
|||
return mime or "image/png"
|
||||
|
||||
|
||||
def video_to_webp(data: bytes) -> bytes:
|
||||
def _video_to_webp(data: bytes) -> bytes:
|
||||
mime = guess_mime(data)
|
||||
ext = mimetypes.guess_extension(mime)
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
||||
|
@ -74,32 +75,102 @@ def video_to_webp(data: bytes) -> bytes:
|
|||
return webp.read()
|
||||
|
||||
|
||||
def video_to_gif(data: bytes, mime: str) -> bytes:
|
||||
def video_to_webp(data: bytes) -> bytes:
|
||||
mime = guess_mime(data)
|
||||
ext = mimetypes.guess_extension(mime)
|
||||
if mime.startswith("video/"):
|
||||
# run ffmpeg to fix duration
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||
temp.write(data)
|
||||
temp.flush()
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
|
||||
print(".", end="", flush=True)
|
||||
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
|
||||
capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||
temp_fixed.seek(0)
|
||||
data = temp_fixed.read()
|
||||
data = video_to_webp(data)
|
||||
# run ffmpeg to fix duration
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||
temp.write(data)
|
||||
temp.flush()
|
||||
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
|
||||
print(".", end="", flush=True)
|
||||
im = Image.open(temp.name)
|
||||
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
|
||||
capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||
temp_fixed.seek(0)
|
||||
data = temp_fixed.read()
|
||||
return _video_to_webp(data)
|
||||
|
||||
|
||||
def process_frame(frame):
|
||||
"""
|
||||
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
|
||||
"""
|
||||
frame = frame.convert('RGBA')
|
||||
|
||||
# Decompose Alpha channel
|
||||
alpha = frame.getchannel('A')
|
||||
|
||||
# Process Alpha channel with threshold, remove semi-transparent pixels
|
||||
# Threshold can be adjusted as needed (0-255), 128 is the middle value
|
||||
threshold = 128
|
||||
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
|
||||
|
||||
# Process Alpha channel with MinFilter, remove edge noise
|
||||
alpha = alpha.filter(ImageFilter.MinFilter(3))
|
||||
|
||||
# Process Alpha channel with MaxFilter, repair edges
|
||||
alpha = alpha.filter(ImageFilter.MaxFilter(3))
|
||||
|
||||
# Apply processed Alpha channel back to image
|
||||
frame.putalpha(alpha)
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
def webp_to_others(data: bytes, mimetype: str) -> bytes:
|
||||
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
|
||||
webp.write(data)
|
||||
webp.flush()
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
with tempfile.NamedTemporaryFile(suffix=ext) as img:
|
||||
print(".", end="", flush=True)
|
||||
im = Image.open(webp.name)
|
||||
im.info.pop('background', None)
|
||||
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
|
||||
gif.seek(0)
|
||||
return gif.read()
|
||||
|
||||
if mimetype == "image/gif":
|
||||
frames = []
|
||||
duration = []
|
||||
|
||||
for frame in ImageSequence.Iterator(im):
|
||||
frame = process_frame(frame)
|
||||
frames.append(frame)
|
||||
duration.append(frame.info.get('duration', 100))
|
||||
|
||||
frames[0].save(img.name, save_all=True, lossless=True, quality=100, method=6,
|
||||
append_images=frames[1:], loop=0, duration=duration, disposal=2)
|
||||
else:
|
||||
im.save(img.name, save_all=True, lossless=True, quality=100, method=6)
|
||||
|
||||
img.seek(0)
|
||||
return img.read()
|
||||
|
||||
|
||||
def is_uniform_animated_webp(data: bytes) -> bool:
|
||||
img = Image.open(BytesIO(data))
|
||||
if img.n_frames <= 1:
|
||||
return False
|
||||
|
||||
first_frame = np.array(img)
|
||||
for frame_number in range(1, img.n_frames):
|
||||
img.seek(frame_number)
|
||||
current_frame = np.array(img)
|
||||
if not np.array_equal(first_frame, current_frame):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def webp_to_gif_or_png(data: bytes) -> bytes:
|
||||
# check if the webp is animated
|
||||
image: Image.Image = Image.open(BytesIO(data))
|
||||
is_animated = getattr(image, "is_animated", False)
|
||||
if is_animated and not is_uniform_animated_webp(data):
|
||||
return webp_to_others(data, "image/gif")
|
||||
else:
|
||||
# convert to png
|
||||
return webp_to_others(data, "image/png")
|
||||
|
||||
|
||||
def opermize_gif(data: bytes) -> bytes:
|
||||
|
@ -118,9 +189,6 @@ def opermize_gif(data: bytes) -> bytes:
|
|||
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||
image: Image.Image = Image.open(BytesIO(data))
|
||||
new_file = BytesIO()
|
||||
suffix = mimetypes.guess_extension(mimetype)
|
||||
if suffix:
|
||||
suffix = suffix[1:]
|
||||
# Determine if the image is a GIF
|
||||
is_animated = getattr(image, "is_animated", False)
|
||||
if is_animated:
|
||||
|
@ -138,6 +206,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
|||
# Get the size of the first frame to determine resizing
|
||||
w, h = frames[0].size
|
||||
else:
|
||||
suffix = mimetypes.guess_extension(mimetype)
|
||||
if suffix:
|
||||
suffix = suffix[1:]
|
||||
image = image.convert("RGBA")
|
||||
image.save(new_file, format=suffix)
|
||||
w, h = image.size
|
||||
|
@ -155,9 +226,8 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
|||
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||
mimetype = guess_mime(data)
|
||||
if mimetype.startswith("video/"):
|
||||
data = video_to_gif(data, mimetype)
|
||||
data = video_to_webp(data)
|
||||
print(".", end="", flush=True)
|
||||
mimetype = "image/gif"
|
||||
elif mimetype.startswith("application/gzip"):
|
||||
print(".", end="", flush=True)
|
||||
# unzip file
|
||||
|
@ -168,7 +238,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
|||
suffix = mimetypes.guess_extension(mimetype)
|
||||
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
||||
temp.write(data)
|
||||
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
|
||||
# run lottie_convert.py input output
|
||||
print(".", end="", flush=True)
|
||||
import subprocess
|
||||
|
@ -179,7 +249,10 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
|||
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
||||
gif.seek(0)
|
||||
data = gif.read()
|
||||
mimetype = "image/gif"
|
||||
mimetype = guess_mime(data)
|
||||
if mimetype == "image/webp":
|
||||
data = webp_to_gif_or_png(data)
|
||||
mimetype = guess_mime(data)
|
||||
rlt = _convert_image(data, mimetype)
|
||||
data = rlt[0]
|
||||
if mimetype == "image/gif":
|
||||
|
|
Loading…
Reference in New Issue