Compare commits
3 Commits
e5905c56dd
...
0550c0ad3e
Author | SHA1 | Date |
---|---|---|
xz-dev | 0550c0ad3e | |
xz-dev | 86cb2edcfa | |
xz-dev | a0ef9f84be |
|
@ -15,6 +15,7 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
import numpy as np
|
||||||
import os.path
|
import os.path
|
||||||
import subprocess
|
import subprocess
|
||||||
import json
|
import json
|
||||||
|
@ -26,7 +27,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
||||||
magic = None
|
magic = None
|
||||||
from PIL import Image, ImageSequence
|
from PIL import Image, ImageSequence, ImageFilter
|
||||||
|
|
||||||
from . import matrix
|
from . import matrix
|
||||||
|
|
||||||
|
@ -48,7 +49,7 @@ def guess_mime(data: bytes) -> str:
|
||||||
return mime or "image/png"
|
return mime or "image/png"
|
||||||
|
|
||||||
|
|
||||||
def video_to_webp(data: bytes) -> bytes:
|
def _video_to_webp(data: bytes) -> bytes:
|
||||||
mime = guess_mime(data)
|
mime = guess_mime(data)
|
||||||
ext = mimetypes.guess_extension(mime)
|
ext = mimetypes.guess_extension(mime)
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
||||||
|
@ -74,32 +75,102 @@ def video_to_webp(data: bytes) -> bytes:
|
||||||
return webp.read()
|
return webp.read()
|
||||||
|
|
||||||
|
|
||||||
def video_to_gif(data: bytes, mime: str) -> bytes:
|
def video_to_webp(data: bytes) -> bytes:
|
||||||
|
mime = guess_mime(data)
|
||||||
ext = mimetypes.guess_extension(mime)
|
ext = mimetypes.guess_extension(mime)
|
||||||
if mime.startswith("video/"):
|
# run ffmpeg to fix duration
|
||||||
# run ffmpeg to fix duration
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
|
||||||
temp.write(data)
|
|
||||||
temp.flush()
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
|
|
||||||
print(".", end="", flush=True)
|
|
||||||
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
|
|
||||||
capture_output=True)
|
|
||||||
if result.returncode != 0:
|
|
||||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
|
||||||
temp_fixed.seek(0)
|
|
||||||
data = temp_fixed.read()
|
|
||||||
data = video_to_webp(data)
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||||
temp.write(data)
|
temp.write(data)
|
||||||
temp.flush()
|
temp.flush()
|
||||||
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
im = Image.open(temp.name)
|
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
|
||||||
|
capture_output=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||||
|
temp_fixed.seek(0)
|
||||||
|
data = temp_fixed.read()
|
||||||
|
return _video_to_webp(data)
|
||||||
|
|
||||||
|
|
||||||
|
def process_frame(frame):
|
||||||
|
"""
|
||||||
|
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
|
||||||
|
"""
|
||||||
|
frame = frame.convert('RGBA')
|
||||||
|
|
||||||
|
# Decompose Alpha channel
|
||||||
|
alpha = frame.getchannel('A')
|
||||||
|
|
||||||
|
# Process Alpha channel with threshold, remove semi-transparent pixels
|
||||||
|
# Threshold can be adjusted as needed (0-255), 128 is the middle value
|
||||||
|
threshold = 128
|
||||||
|
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
|
||||||
|
|
||||||
|
# Process Alpha channel with MinFilter, remove edge noise
|
||||||
|
alpha = alpha.filter(ImageFilter.MinFilter(3))
|
||||||
|
|
||||||
|
# Process Alpha channel with MaxFilter, repair edges
|
||||||
|
alpha = alpha.filter(ImageFilter.MaxFilter(3))
|
||||||
|
|
||||||
|
# Apply processed Alpha channel back to image
|
||||||
|
frame.putalpha(alpha)
|
||||||
|
|
||||||
|
return frame
|
||||||
|
|
||||||
|
|
||||||
|
def webp_to_others(data: bytes, mimetype: str) -> bytes:
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
|
||||||
|
webp.write(data)
|
||||||
|
webp.flush()
|
||||||
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=ext) as img:
|
||||||
|
print(".", end="", flush=True)
|
||||||
|
im = Image.open(webp.name)
|
||||||
im.info.pop('background', None)
|
im.info.pop('background', None)
|
||||||
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
|
|
||||||
gif.seek(0)
|
if mimetype == "image/gif":
|
||||||
return gif.read()
|
frames = []
|
||||||
|
duration = []
|
||||||
|
|
||||||
|
for frame in ImageSequence.Iterator(im):
|
||||||
|
frame = process_frame(frame)
|
||||||
|
frames.append(frame)
|
||||||
|
duration.append(frame.info.get('duration', 100))
|
||||||
|
|
||||||
|
frames[0].save(img.name, save_all=True, lossless=True, quality=100, method=6,
|
||||||
|
append_images=frames[1:], loop=0, duration=duration, disposal=2)
|
||||||
|
else:
|
||||||
|
im.save(img.name, save_all=True, lossless=True, quality=100, method=6)
|
||||||
|
|
||||||
|
img.seek(0)
|
||||||
|
return img.read()
|
||||||
|
|
||||||
|
|
||||||
|
def is_uniform_animated_webp(data: bytes) -> bool:
|
||||||
|
img = Image.open(BytesIO(data))
|
||||||
|
if img.n_frames <= 1:
|
||||||
|
return False
|
||||||
|
|
||||||
|
first_frame = np.array(img)
|
||||||
|
for frame_number in range(1, img.n_frames):
|
||||||
|
img.seek(frame_number)
|
||||||
|
current_frame = np.array(img)
|
||||||
|
if not np.array_equal(first_frame, current_frame):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def webp_to_gif_or_png(data: bytes) -> bytes:
|
||||||
|
# check if the webp is animated
|
||||||
|
image: Image.Image = Image.open(BytesIO(data))
|
||||||
|
is_animated = getattr(image, "is_animated", False)
|
||||||
|
if is_animated and not is_uniform_animated_webp(data):
|
||||||
|
return webp_to_others(data, "image/gif")
|
||||||
|
else:
|
||||||
|
# convert to png
|
||||||
|
return webp_to_others(data, "image/png")
|
||||||
|
|
||||||
|
|
||||||
def opermize_gif(data: bytes) -> bytes:
|
def opermize_gif(data: bytes) -> bytes:
|
||||||
|
@ -118,9 +189,6 @@ def opermize_gif(data: bytes) -> bytes:
|
||||||
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
image: Image.Image = Image.open(BytesIO(data))
|
image: Image.Image = Image.open(BytesIO(data))
|
||||||
new_file = BytesIO()
|
new_file = BytesIO()
|
||||||
suffix = mimetypes.guess_extension(mimetype)
|
|
||||||
if suffix:
|
|
||||||
suffix = suffix[1:]
|
|
||||||
# Determine if the image is a GIF
|
# Determine if the image is a GIF
|
||||||
is_animated = getattr(image, "is_animated", False)
|
is_animated = getattr(image, "is_animated", False)
|
||||||
if is_animated:
|
if is_animated:
|
||||||
|
@ -138,6 +206,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
# Get the size of the first frame to determine resizing
|
# Get the size of the first frame to determine resizing
|
||||||
w, h = frames[0].size
|
w, h = frames[0].size
|
||||||
else:
|
else:
|
||||||
|
suffix = mimetypes.guess_extension(mimetype)
|
||||||
|
if suffix:
|
||||||
|
suffix = suffix[1:]
|
||||||
image = image.convert("RGBA")
|
image = image.convert("RGBA")
|
||||||
image.save(new_file, format=suffix)
|
image.save(new_file, format=suffix)
|
||||||
w, h = image.size
|
w, h = image.size
|
||||||
|
@ -155,9 +226,8 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
mimetype = guess_mime(data)
|
mimetype = guess_mime(data)
|
||||||
if mimetype.startswith("video/"):
|
if mimetype.startswith("video/"):
|
||||||
data = video_to_gif(data, mimetype)
|
data = video_to_webp(data)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
mimetype = "image/gif"
|
|
||||||
elif mimetype.startswith("application/gzip"):
|
elif mimetype.startswith("application/gzip"):
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
# unzip file
|
# unzip file
|
||||||
|
@ -168,7 +238,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
suffix = mimetypes.guess_extension(mimetype)
|
suffix = mimetypes.guess_extension(mimetype)
|
||||||
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
||||||
temp.write(data)
|
temp.write(data)
|
||||||
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
|
||||||
# run lottie_convert.py input output
|
# run lottie_convert.py input output
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
import subprocess
|
import subprocess
|
||||||
|
@ -179,7 +249,10 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
||||||
gif.seek(0)
|
gif.seek(0)
|
||||||
data = gif.read()
|
data = gif.read()
|
||||||
mimetype = "image/gif"
|
mimetype = guess_mime(data)
|
||||||
|
if mimetype == "image/webp":
|
||||||
|
data = webp_to_gif_or_png(data)
|
||||||
|
mimetype = guess_mime(data)
|
||||||
rlt = _convert_image(data, mimetype)
|
rlt = _convert_image(data, mimetype)
|
||||||
data = rlt[0]
|
data = rlt[0]
|
||||||
if mimetype == "image/gif":
|
if mimetype == "image/gif":
|
||||||
|
|
Loading…
Reference in New Issue