Compare commits

..

No commits in common. "86cb2edcfa4e04114514438a4c25e0e4170e093b" and "a83bc15208f6ddf59b555c8f30c0e6496cda2a89" have entirely different histories.

1 changed files with 29 additions and 102 deletions

View File

@ -15,7 +15,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from functools import partial from functools import partial
from io import BytesIO from io import BytesIO
import numpy as np
import os.path import os.path
import subprocess import subprocess
import json import json
@ -27,7 +26,7 @@ try:
except ImportError: except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types") print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None magic = None
from PIL import Image, ImageSequence, ImageFilter from PIL import Image, ImageSequence
from . import matrix from . import matrix
@ -49,7 +48,7 @@ def guess_mime(data: bytes) -> str:
return mime or "image/png" return mime or "image/png"
def _video_to_webp(data: bytes) -> bytes: def video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data) mime = guess_mime(data)
ext = mimetypes.guess_extension(mime) ext = mimetypes.guess_extension(mime)
with tempfile.NamedTemporaryFile(suffix=ext) as video: with tempfile.NamedTemporaryFile(suffix=ext) as video:
@ -75,102 +74,32 @@ def _video_to_webp(data: bytes) -> bytes:
return webp.read() return webp.read()
def video_to_webp(data: bytes) -> bytes: def video_to_gif(data: bytes, mime: str) -> bytes:
mime = guess_mime(data)
ext = mimetypes.guess_extension(mime) ext = mimetypes.guess_extension(mime)
# run ffmpeg to fix duration if mime.startswith("video/"):
# run ffmpeg to fix duration
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
data = video_to_webp(data)
with tempfile.NamedTemporaryFile(suffix=ext) as temp: with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data) temp.write(data)
temp.flush() temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed: with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
print(".", end="", flush=True) print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name], im = Image.open(temp.name)
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
return _video_to_webp(data)
def process_frame(frame):
"""
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
"""
frame = frame.convert('RGBA')
# Decompose Alpha channel
alpha = frame.getchannel('A')
# Process Alpha channel with threshold, remove semi-transparent pixels
# Threshold can be adjusted as needed (0-255), 128 is the middle value
threshold = 128
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
# Process Alpha channel with MinFilter, remove edge noise
alpha = alpha.filter(ImageFilter.MinFilter(3))
# Process Alpha channel with MaxFilter, repair edges
alpha = alpha.filter(ImageFilter.MaxFilter(3))
# Apply processed Alpha channel back to image
frame.putalpha(alpha)
return frame
def webp_to_others(data: bytes, mimetype: str) -> bytes:
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
webp.write(data)
webp.flush()
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=ext) as img:
print(".", end="", flush=True)
im = Image.open(webp.name)
im.info.pop('background', None) im.info.pop('background', None)
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
if mimetype == "image/gif": gif.seek(0)
frames = [] return gif.read()
duration = []
for frame in ImageSequence.Iterator(im):
frame = process_frame(frame)
frames.append(frame)
duration.append(frame.info.get('duration', 100))
frames[0].save(img.name, save_all=True, lossless=True, quality=100, method=6,
append_images=frames[1:], loop=0, duration=duration, disposal=2)
else:
im.save(img.name, save_all=True, lossless=True, quality=100, method=6)
img.seek(0)
return img.read()
def is_uniform_animated_webp(data: bytes) -> bool:
img = Image.open(BytesIO(data))
if img.n_frames <= 1:
return False
first_frame = np.array(img)
for frame_number in range(1, img.n_frames):
img.seek(frame_number)
current_frame = np.array(img)
if not np.array_equal(first_frame, current_frame):
return False
return True
def webp_to_gif_or_png(data: bytes) -> bytes:
# check if the webp is animated
image: Image.Image = Image.open(BytesIO(data))
is_animated = getattr(image, "is_animated", False)
if is_animated and not is_uniform_animated_webp(data):
return webp_to_others(data, "image/gif")
else:
# convert to png
return webp_to_others(data, "image/png")
def opermize_gif(data: bytes) -> bytes: def opermize_gif(data: bytes) -> bytes:
@ -189,6 +118,9 @@ def opermize_gif(data: bytes) -> bytes:
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int): def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)) image: Image.Image = Image.open(BytesIO(data))
new_file = BytesIO() new_file = BytesIO()
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
# Determine if the image is a GIF # Determine if the image is a GIF
is_animated = getattr(image, "is_animated", False) is_animated = getattr(image, "is_animated", False)
if is_animated: if is_animated:
@ -206,9 +138,6 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
# Get the size of the first frame to determine resizing # Get the size of the first frame to determine resizing
w, h = frames[0].size w, h = frames[0].size
else: else:
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
image = image.convert("RGBA") image = image.convert("RGBA")
image.save(new_file, format=suffix) image.save(new_file, format=suffix)
w, h = image.size w, h = image.size
@ -226,8 +155,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
def _convert_sticker(data: bytes) -> (bytes, str, int, int): def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data) mimetype = guess_mime(data)
if mimetype.startswith("video/"): if mimetype.startswith("video/"):
data = video_to_webp(data) data = video_to_gif(data, mimetype)
print(".", end="", flush=True) print(".", end="", flush=True)
mimetype = "image/gif"
elif mimetype.startswith("application/gzip"): elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True) print(".", end="", flush=True)
# unzip file # unzip file
@ -238,7 +168,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
suffix = mimetypes.guess_extension(mimetype) suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp: with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data) temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".webp") as gif: with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
# run lottie_convert.py input output # run lottie_convert.py input output
print(".", end="", flush=True) print(".", end="", flush=True)
import subprocess import subprocess
@ -249,10 +179,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}") raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0) gif.seek(0)
data = gif.read() data = gif.read()
mimetype = guess_mime(data) mimetype = "image/gif"
if mimetype == "image/webp":
data = webp_to_gif_or_png(data)
mimetype = guess_mime(data)
rlt = _convert_image(data, mimetype) rlt = _convert_image(data, mimetype)
data = rlt[0] data = rlt[0]
if mimetype == "image/gif": if mimetype == "image/gif":