Compare commits

..

No commits in common. "86cb2edcfa4e04114514438a4c25e0e4170e093b" and "a83bc15208f6ddf59b555c8f30c0e6496cda2a89" have entirely different histories.

1 changed files with 29 additions and 102 deletions

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from functools import partial
from io import BytesIO
import numpy as np
import os.path
import subprocess
import json
@ -27,7 +26,7 @@ try:
except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None
from PIL import Image, ImageSequence, ImageFilter
from PIL import Image, ImageSequence
from . import matrix
@ -49,7 +48,7 @@ def guess_mime(data: bytes) -> str:
return mime or "image/png"
def _video_to_webp(data: bytes) -> bytes:
def video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data)
ext = mimetypes.guess_extension(mime)
with tempfile.NamedTemporaryFile(suffix=ext) as video:
@ -75,102 +74,32 @@ def _video_to_webp(data: bytes) -> bytes:
return webp.read()
def video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data)
def video_to_gif(data: bytes, mime: str) -> bytes:
ext = mimetypes.guess_extension(mime)
# run ffmpeg to fix duration
if mime.startswith("video/"):
# run ffmpeg to fix duration
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
data = video_to_webp(data)
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
return _video_to_webp(data)
def process_frame(frame):
"""
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
"""
frame = frame.convert('RGBA')
# Decompose Alpha channel
alpha = frame.getchannel('A')
# Process Alpha channel with threshold, remove semi-transparent pixels
# Threshold can be adjusted as needed (0-255), 128 is the middle value
threshold = 128
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
# Process Alpha channel with MinFilter, remove edge noise
alpha = alpha.filter(ImageFilter.MinFilter(3))
# Process Alpha channel with MaxFilter, repair edges
alpha = alpha.filter(ImageFilter.MaxFilter(3))
# Apply processed Alpha channel back to image
frame.putalpha(alpha)
return frame
def webp_to_others(data: bytes, mimetype: str) -> bytes:
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
webp.write(data)
webp.flush()
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=ext) as img:
print(".", end="", flush=True)
im = Image.open(webp.name)
im = Image.open(temp.name)
im.info.pop('background', None)
if mimetype == "image/gif":
frames = []
duration = []
for frame in ImageSequence.Iterator(im):
frame = process_frame(frame)
frames.append(frame)
duration.append(frame.info.get('duration', 100))
frames[0].save(img.name, save_all=True, lossless=True, quality=100, method=6,
append_images=frames[1:], loop=0, duration=duration, disposal=2)
else:
im.save(img.name, save_all=True, lossless=True, quality=100, method=6)
img.seek(0)
return img.read()
def is_uniform_animated_webp(data: bytes) -> bool:
img = Image.open(BytesIO(data))
if img.n_frames <= 1:
return False
first_frame = np.array(img)
for frame_number in range(1, img.n_frames):
img.seek(frame_number)
current_frame = np.array(img)
if not np.array_equal(first_frame, current_frame):
return False
return True
def webp_to_gif_or_png(data: bytes) -> bytes:
# check if the webp is animated
image: Image.Image = Image.open(BytesIO(data))
is_animated = getattr(image, "is_animated", False)
if is_animated and not is_uniform_animated_webp(data):
return webp_to_others(data, "image/gif")
else:
# convert to png
return webp_to_others(data, "image/png")
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
gif.seek(0)
return gif.read()
def opermize_gif(data: bytes) -> bytes:
@ -189,6 +118,9 @@ def opermize_gif(data: bytes) -> bytes:
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data))
new_file = BytesIO()
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
# Determine if the image is a GIF
is_animated = getattr(image, "is_animated", False)
if is_animated:
@ -206,9 +138,6 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
# Get the size of the first frame to determine resizing
w, h = frames[0].size
else:
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
image = image.convert("RGBA")
image.save(new_file, format=suffix)
w, h = image.size
@ -226,8 +155,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data)
if mimetype.startswith("video/"):
data = video_to_webp(data)
data = video_to_gif(data, mimetype)
print(".", end="", flush=True)
mimetype = "image/gif"
elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True)
# unzip file
@ -238,7 +168,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
# run lottie_convert.py input output
print(".", end="", flush=True)
import subprocess
@ -249,10 +179,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0)
data = gif.read()
mimetype = guess_mime(data)
if mimetype == "image/webp":
data = webp_to_gif_or_png(data)
mimetype = guess_mime(data)
mimetype = "image/gif"
rlt = _convert_image(data, mimetype)
data = rlt[0]
if mimetype == "image/gif":