Compare commits
1 Commits
0550c0ad3e
...
e5905c56dd
Author | SHA1 | Date |
---|---|---|
xz-dev | e5905c56dd |
|
@ -15,7 +15,6 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import numpy as np
|
|
||||||
import os.path
|
import os.path
|
||||||
import subprocess
|
import subprocess
|
||||||
import json
|
import json
|
||||||
|
@ -27,7 +26,7 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
print("[Warning] Magic is not installed, using file extensions to guess mime types")
|
||||||
magic = None
|
magic = None
|
||||||
from PIL import Image, ImageSequence, ImageFilter
|
from PIL import Image, ImageSequence
|
||||||
|
|
||||||
from . import matrix
|
from . import matrix
|
||||||
|
|
||||||
|
@ -49,7 +48,7 @@ def guess_mime(data: bytes) -> str:
|
||||||
return mime or "image/png"
|
return mime or "image/png"
|
||||||
|
|
||||||
|
|
||||||
def _video_to_webp(data: bytes) -> bytes:
|
def video_to_webp(data: bytes) -> bytes:
|
||||||
mime = guess_mime(data)
|
mime = guess_mime(data)
|
||||||
ext = mimetypes.guess_extension(mime)
|
ext = mimetypes.guess_extension(mime)
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
with tempfile.NamedTemporaryFile(suffix=ext) as video:
|
||||||
|
@ -75,9 +74,9 @@ def _video_to_webp(data: bytes) -> bytes:
|
||||||
return webp.read()
|
return webp.read()
|
||||||
|
|
||||||
|
|
||||||
def video_to_webp(data: bytes) -> bytes:
|
def video_to_gif(data: bytes, mime: str) -> bytes:
|
||||||
mime = guess_mime(data)
|
|
||||||
ext = mimetypes.guess_extension(mime)
|
ext = mimetypes.guess_extension(mime)
|
||||||
|
if mime.startswith("video/"):
|
||||||
# run ffmpeg to fix duration
|
# run ffmpeg to fix duration
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||||
temp.write(data)
|
temp.write(data)
|
||||||
|
@ -90,87 +89,17 @@ def video_to_webp(data: bytes) -> bytes:
|
||||||
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
|
||||||
temp_fixed.seek(0)
|
temp_fixed.seek(0)
|
||||||
data = temp_fixed.read()
|
data = temp_fixed.read()
|
||||||
return _video_to_webp(data)
|
data = video_to_webp(data)
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
|
||||||
|
temp.write(data)
|
||||||
def process_frame(frame):
|
temp.flush()
|
||||||
"""
|
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||||
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
|
|
||||||
"""
|
|
||||||
frame = frame.convert('RGBA')
|
|
||||||
|
|
||||||
# Decompose Alpha channel
|
|
||||||
alpha = frame.getchannel('A')
|
|
||||||
|
|
||||||
# Process Alpha channel with threshold, remove semi-transparent pixels
|
|
||||||
# Threshold can be adjusted as needed (0-255), 128 is the middle value
|
|
||||||
threshold = 128
|
|
||||||
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)
|
|
||||||
|
|
||||||
# Process Alpha channel with MinFilter, remove edge noise
|
|
||||||
alpha = alpha.filter(ImageFilter.MinFilter(3))
|
|
||||||
|
|
||||||
# Process Alpha channel with MaxFilter, repair edges
|
|
||||||
alpha = alpha.filter(ImageFilter.MaxFilter(3))
|
|
||||||
|
|
||||||
# Apply processed Alpha channel back to image
|
|
||||||
frame.putalpha(alpha)
|
|
||||||
|
|
||||||
return frame
|
|
||||||
|
|
||||||
|
|
||||||
def webp_to_others(data: bytes, mimetype: str) -> bytes:
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
|
|
||||||
webp.write(data)
|
|
||||||
webp.flush()
|
|
||||||
ext = mimetypes.guess_extension(mimetype)
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=ext) as img:
|
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
im = Image.open(webp.name)
|
im = Image.open(temp.name)
|
||||||
im.info.pop('background', None)
|
im.info.pop('background', None)
|
||||||
|
im.save(gif.name, save_all=True, lossless=True, quality=100, method=6)
|
||||||
if mimetype == "image/gif":
|
gif.seek(0)
|
||||||
frames = []
|
return gif.read()
|
||||||
duration = []
|
|
||||||
|
|
||||||
for frame in ImageSequence.Iterator(im):
|
|
||||||
frame = process_frame(frame)
|
|
||||||
frames.append(frame)
|
|
||||||
duration.append(frame.info.get('duration', 100))
|
|
||||||
|
|
||||||
frames[0].save(img.name, save_all=True, lossless=True, quality=100, method=6,
|
|
||||||
append_images=frames[1:], loop=0, duration=duration, disposal=2)
|
|
||||||
else:
|
|
||||||
im.save(img.name, save_all=True, lossless=True, quality=100, method=6)
|
|
||||||
|
|
||||||
img.seek(0)
|
|
||||||
return img.read()
|
|
||||||
|
|
||||||
|
|
||||||
def is_uniform_animated_webp(data: bytes) -> bool:
|
|
||||||
img = Image.open(BytesIO(data))
|
|
||||||
if img.n_frames <= 1:
|
|
||||||
return False
|
|
||||||
|
|
||||||
first_frame = np.array(img)
|
|
||||||
for frame_number in range(1, img.n_frames):
|
|
||||||
img.seek(frame_number)
|
|
||||||
current_frame = np.array(img)
|
|
||||||
if not np.array_equal(first_frame, current_frame):
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def webp_to_gif_or_png(data: bytes) -> bytes:
|
|
||||||
# check if the webp is animated
|
|
||||||
image: Image.Image = Image.open(BytesIO(data))
|
|
||||||
is_animated = getattr(image, "is_animated", False)
|
|
||||||
if is_animated and not is_uniform_animated_webp(data):
|
|
||||||
return webp_to_others(data, "image/gif")
|
|
||||||
else:
|
|
||||||
# convert to png
|
|
||||||
return webp_to_others(data, "image/png")
|
|
||||||
|
|
||||||
|
|
||||||
def opermize_gif(data: bytes) -> bytes:
|
def opermize_gif(data: bytes) -> bytes:
|
||||||
|
@ -189,6 +118,9 @@ def opermize_gif(data: bytes) -> bytes:
|
||||||
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
image: Image.Image = Image.open(BytesIO(data))
|
image: Image.Image = Image.open(BytesIO(data))
|
||||||
new_file = BytesIO()
|
new_file = BytesIO()
|
||||||
|
suffix = mimetypes.guess_extension(mimetype)
|
||||||
|
if suffix:
|
||||||
|
suffix = suffix[1:]
|
||||||
# Determine if the image is a GIF
|
# Determine if the image is a GIF
|
||||||
is_animated = getattr(image, "is_animated", False)
|
is_animated = getattr(image, "is_animated", False)
|
||||||
if is_animated:
|
if is_animated:
|
||||||
|
@ -206,9 +138,6 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
# Get the size of the first frame to determine resizing
|
# Get the size of the first frame to determine resizing
|
||||||
w, h = frames[0].size
|
w, h = frames[0].size
|
||||||
else:
|
else:
|
||||||
suffix = mimetypes.guess_extension(mimetype)
|
|
||||||
if suffix:
|
|
||||||
suffix = suffix[1:]
|
|
||||||
image = image.convert("RGBA")
|
image = image.convert("RGBA")
|
||||||
image.save(new_file, format=suffix)
|
image.save(new_file, format=suffix)
|
||||||
w, h = image.size
|
w, h = image.size
|
||||||
|
@ -226,8 +155,9 @@ def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
|
||||||
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
mimetype = guess_mime(data)
|
mimetype = guess_mime(data)
|
||||||
if mimetype.startswith("video/"):
|
if mimetype.startswith("video/"):
|
||||||
data = video_to_webp(data)
|
data = video_to_gif(data, mimetype)
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
|
mimetype = "image/gif"
|
||||||
elif mimetype.startswith("application/gzip"):
|
elif mimetype.startswith("application/gzip"):
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
# unzip file
|
# unzip file
|
||||||
|
@ -238,7 +168,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
suffix = mimetypes.guess_extension(mimetype)
|
suffix = mimetypes.guess_extension(mimetype)
|
||||||
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
|
||||||
temp.write(data)
|
temp.write(data)
|
||||||
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
|
with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
|
||||||
# run lottie_convert.py input output
|
# run lottie_convert.py input output
|
||||||
print(".", end="", flush=True)
|
print(".", end="", flush=True)
|
||||||
import subprocess
|
import subprocess
|
||||||
|
@ -249,10 +179,7 @@ def _convert_sticker(data: bytes) -> (bytes, str, int, int):
|
||||||
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
|
||||||
gif.seek(0)
|
gif.seek(0)
|
||||||
data = gif.read()
|
data = gif.read()
|
||||||
mimetype = guess_mime(data)
|
mimetype = "image/gif"
|
||||||
if mimetype == "image/webp":
|
|
||||||
data = webp_to_gif_or_png(data)
|
|
||||||
mimetype = guess_mime(data)
|
|
||||||
rlt = _convert_image(data, mimetype)
|
rlt = _convert_image(data, mimetype)
|
||||||
data = rlt[0]
|
data = rlt[0]
|
||||||
if mimetype == "image/gif":
|
if mimetype == "image/gif":
|
||||||
|
|
Loading…
Reference in New Issue