mirror of
				https://github.com/maunium/stickerpicker
				synced 2025-11-03 21:57:19 +00:00 
			
		
		
		
	Merge 78ce8aabea6d572569e5160d6daeffa692d7b65c into 333567f481e60443360aa7199d481e1a45b3a523
This commit is contained in:
		
						commit
						6e490e052b
					
				@ -4,3 +4,5 @@ pillow
 | 
				
			|||||||
telethon
 | 
					telethon
 | 
				
			||||||
cryptg
 | 
					cryptg
 | 
				
			||||||
python-magic
 | 
					python-magic
 | 
				
			||||||
 | 
					moviepy
 | 
				
			||||||
 | 
					lottie[all]
 | 
				
			||||||
 | 
				
			|||||||
@ -16,19 +16,104 @@
 | 
				
			|||||||
from functools import partial
 | 
					from functools import partial
 | 
				
			||||||
from io import BytesIO
 | 
					from io import BytesIO
 | 
				
			||||||
import os.path
 | 
					import os.path
 | 
				
			||||||
 | 
					import subprocess
 | 
				
			||||||
import json
 | 
					import json
 | 
				
			||||||
 | 
					import tempfile
 | 
				
			||||||
 | 
					import mimetypes
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from PIL import Image
 | 
					try:
 | 
				
			||||||
 | 
					    import magic
 | 
				
			||||||
 | 
					except ImportError:
 | 
				
			||||||
 | 
					    print("[Warning] Magic is not installed, using file extensions to guess mime types")
 | 
				
			||||||
 | 
					    magic = None
 | 
				
			||||||
 | 
					from PIL import Image, ImageSequence
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from . import matrix
 | 
					from . import matrix
 | 
				
			||||||
 | 
					
 | 
				
			||||||
open_utf8 = partial(open, encoding='UTF-8')
 | 
					open_utf8 = partial(open, encoding='UTF-8')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def convert_image(data: bytes) -> (bytes, int, int):
 | 
					
 | 
				
			||||||
    image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
 | 
					def guess_mime(data: bytes) -> str:
 | 
				
			||||||
 | 
					    mime = None
 | 
				
			||||||
 | 
					    if magic:
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            return magic.Magic(mime=True).from_buffer(data)
 | 
				
			||||||
 | 
					        except Exception:
 | 
				
			||||||
 | 
					            pass
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        with tempfile.NamedTemporaryFile() as temp:
 | 
				
			||||||
 | 
					            temp.write(data)
 | 
				
			||||||
 | 
					            temp.close()
 | 
				
			||||||
 | 
					            mime, _ = mimetypes.guess_type(temp.name)
 | 
				
			||||||
 | 
					    return mime or "image/png"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def video_to_gif(data: bytes, mime: str) -> bytes:
 | 
				
			||||||
 | 
					    ext = mimetypes.guess_extension(mime)
 | 
				
			||||||
 | 
					    if mime.startswith("video/"):
 | 
				
			||||||
 | 
					        # run ffmpeg to fix duration
 | 
				
			||||||
 | 
					        with tempfile.NamedTemporaryFile(suffix=ext) as temp:
 | 
				
			||||||
 | 
					            temp.write(data)
 | 
				
			||||||
 | 
					            temp.flush()
 | 
				
			||||||
 | 
					            with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
 | 
				
			||||||
 | 
					                print(".", end="", flush=True)
 | 
				
			||||||
 | 
					                result = subprocess.run(["ffmpeg", "-y", "-i", temp.name, "-codec", "copy", temp_fixed.name],
 | 
				
			||||||
 | 
					                                        capture_output=True)
 | 
				
			||||||
 | 
					                if result.returncode != 0:
 | 
				
			||||||
 | 
					                    raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
 | 
				
			||||||
 | 
					                temp_fixed.seek(0)
 | 
				
			||||||
 | 
					                data = temp_fixed.read()
 | 
				
			||||||
 | 
					    with tempfile.NamedTemporaryFile(suffix=ext) as temp:
 | 
				
			||||||
 | 
					        temp.write(data)
 | 
				
			||||||
 | 
					        temp.flush()
 | 
				
			||||||
 | 
					        with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
 | 
				
			||||||
 | 
					            from moviepy.editor import VideoFileClip
 | 
				
			||||||
 | 
					            clip = VideoFileClip(temp.name)
 | 
				
			||||||
 | 
					            clip.write_gif(gif.name, logger=None)
 | 
				
			||||||
 | 
					            gif.seek(0)
 | 
				
			||||||
 | 
					            return gif.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def opermize_gif(data: bytes) -> bytes:
 | 
				
			||||||
 | 
					    with tempfile.NamedTemporaryFile() as gif:
 | 
				
			||||||
 | 
					        gif.write(data)
 | 
				
			||||||
 | 
					        gif.flush()
 | 
				
			||||||
 | 
					        # use gifsicle to optimize gif
 | 
				
			||||||
 | 
					        result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
 | 
				
			||||||
 | 
					                                capture_output=True)
 | 
				
			||||||
 | 
					        if result.returncode != 0:
 | 
				
			||||||
 | 
					            raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
 | 
				
			||||||
 | 
					        gif.seek(0)
 | 
				
			||||||
 | 
					        return gif.read()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
 | 
				
			||||||
 | 
					    image: Image.Image = Image.open(BytesIO(data))
 | 
				
			||||||
    new_file = BytesIO()
 | 
					    new_file = BytesIO()
 | 
				
			||||||
    image.save(new_file, "png")
 | 
					    suffix = mimetypes.guess_extension(mimetype)
 | 
				
			||||||
    w, h = image.size
 | 
					    if suffix:
 | 
				
			||||||
 | 
					        suffix = suffix[1:]
 | 
				
			||||||
 | 
					    # Determine if the image is a GIF
 | 
				
			||||||
 | 
					    is_animated = getattr(image, "is_animated", False)
 | 
				
			||||||
 | 
					    if is_animated:
 | 
				
			||||||
 | 
					        frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
 | 
				
			||||||
 | 
					        # Save the new GIF
 | 
				
			||||||
 | 
					        frames[0].save(
 | 
				
			||||||
 | 
					            new_file,
 | 
				
			||||||
 | 
					            format='GIF',
 | 
				
			||||||
 | 
					            save_all=True,
 | 
				
			||||||
 | 
					            append_images=frames[1:],
 | 
				
			||||||
 | 
					            loop=image.info.get('loop', 0),  # Default loop to 0 if not present
 | 
				
			||||||
 | 
					            duration=image.info.get('duration', 100),  # Set a default duration if not present
 | 
				
			||||||
 | 
					            transparency=image.info.get('transparency', 255),  # Default to 255 if transparency is not present
 | 
				
			||||||
 | 
					            disposal=image.info.get('disposal', 2)  # Default to disposal method 2 (restore to background)
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        # Get the size of the first frame to determine resizing
 | 
				
			||||||
 | 
					        w, h = frames[0].size
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        image = image.convert("RGBA")
 | 
				
			||||||
 | 
					        image.save(new_file, format=suffix)
 | 
				
			||||||
 | 
					        w, h = image.size
 | 
				
			||||||
    if w > 256 or h > 256:
 | 
					    if w > 256 or h > 256:
 | 
				
			||||||
        # Set the width and height to lower values so clients wouldn't show them as huge images
 | 
					        # Set the width and height to lower values so clients wouldn't show them as huge images
 | 
				
			||||||
        if w > h:
 | 
					        if w > h:
 | 
				
			||||||
@ -40,6 +125,54 @@ def convert_image(data: bytes) -> (bytes, int, int):
 | 
				
			|||||||
    return new_file.getvalue(), w, h
 | 
					    return new_file.getvalue(), w, h
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def _convert_sticker(data: bytes) -> (bytes, str, int, int):
 | 
				
			||||||
 | 
					    mimetype = guess_mime(data)
 | 
				
			||||||
 | 
					    if mimetype.startswith("video/"):
 | 
				
			||||||
 | 
					        data = video_to_gif(data, mimetype)
 | 
				
			||||||
 | 
					        print(".", end="", flush=True)
 | 
				
			||||||
 | 
					        mimetype = "image/gif"
 | 
				
			||||||
 | 
					    elif mimetype.startswith("application/gzip"):
 | 
				
			||||||
 | 
					        print(".", end="", flush=True)
 | 
				
			||||||
 | 
					        # unzip file
 | 
				
			||||||
 | 
					        import gzip
 | 
				
			||||||
 | 
					        with gzip.open(BytesIO(data), "rb") as f:
 | 
				
			||||||
 | 
					            data = f.read()
 | 
				
			||||||
 | 
					            mimetype = guess_mime(data)
 | 
				
			||||||
 | 
					            suffix = mimetypes.guess_extension(mimetype)
 | 
				
			||||||
 | 
					            with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
 | 
				
			||||||
 | 
					                temp.write(data)
 | 
				
			||||||
 | 
					                with tempfile.NamedTemporaryFile(suffix=".gif") as gif:
 | 
				
			||||||
 | 
					                    # run lottie_convert.py input output
 | 
				
			||||||
 | 
					                    print(".", end="", flush=True)
 | 
				
			||||||
 | 
					                    import subprocess
 | 
				
			||||||
 | 
					                    cmd = ["lottie_convert.py", temp.name, gif.name]
 | 
				
			||||||
 | 
					                    result = subprocess.run(cmd, capture_output=True, text=True)
 | 
				
			||||||
 | 
					                    if result.returncode != 0:
 | 
				
			||||||
 | 
					                        raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
 | 
				
			||||||
 | 
					                    gif.seek(0)
 | 
				
			||||||
 | 
					                    data = gif.read()
 | 
				
			||||||
 | 
					                    mimetype = "image/gif"
 | 
				
			||||||
 | 
					    rlt = _convert_image(data, mimetype)
 | 
				
			||||||
 | 
					    data = rlt[0]
 | 
				
			||||||
 | 
					    if mimetype == "image/gif":
 | 
				
			||||||
 | 
					        print(".", end="", flush=True)
 | 
				
			||||||
 | 
					        data = opermize_gif(data)
 | 
				
			||||||
 | 
					    return  data, mimetype, rlt[1], rlt[2]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def convert_sticker(data: bytes) -> (bytes, str, int, int):
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        return _convert_sticker(data)
 | 
				
			||||||
 | 
					    except Exception as e:
 | 
				
			||||||
 | 
					        mimetype = guess_mime(data)
 | 
				
			||||||
 | 
					        print(f"Error converting image, mimetype: {mimetype}")
 | 
				
			||||||
 | 
					        ext = mimetypes.guess_extension(mimetype)
 | 
				
			||||||
 | 
					        with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
 | 
				
			||||||
 | 
					            temp.write(data)
 | 
				
			||||||
 | 
					            print(f"Saved to {temp.name}")
 | 
				
			||||||
 | 
					        raise e
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def add_to_index(name: str, output_dir: str) -> None:
 | 
					def add_to_index(name: str, output_dir: str) -> None:
 | 
				
			||||||
    index_path = os.path.join(output_dir, "index.json")
 | 
					    index_path = os.path.join(output_dir, "index.json")
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
@ -57,7 +190,7 @@ def add_to_index(name: str, output_dir: str) -> None:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def make_sticker(mxc: str, width: int, height: int, size: int,
 | 
					def make_sticker(mxc: str, width: int, height: int, size: int,
 | 
				
			||||||
                 body: str = "") -> matrix.StickerInfo:
 | 
					                 mimetype: str, body: str = "") -> matrix.StickerInfo:
 | 
				
			||||||
    return {
 | 
					    return {
 | 
				
			||||||
        "body": body,
 | 
					        "body": body,
 | 
				
			||||||
        "url": mxc,
 | 
					        "url": mxc,
 | 
				
			||||||
@ -65,7 +198,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
 | 
				
			|||||||
            "w": width,
 | 
					            "w": width,
 | 
				
			||||||
            "h": height,
 | 
					            "h": height,
 | 
				
			||||||
            "size": size,
 | 
					            "size": size,
 | 
				
			||||||
            "mimetype": "image/png",
 | 
					            "mimetype": mimetype,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Element iOS compatibility hack
 | 
					            # Element iOS compatibility hack
 | 
				
			||||||
            "thumbnail_url": mxc,
 | 
					            "thumbnail_url": mxc,
 | 
				
			||||||
@ -73,7 +206,7 @@ def make_sticker(mxc: str, width: int, height: int, size: int,
 | 
				
			|||||||
                "w": width,
 | 
					                "w": width,
 | 
				
			||||||
                "h": height,
 | 
					                "h": height,
 | 
				
			||||||
                "size": size,
 | 
					                "size": size,
 | 
				
			||||||
                "mimetype": "image/png",
 | 
					                "mimetype": mimetype,
 | 
				
			||||||
            },
 | 
					            },
 | 
				
			||||||
        },
 | 
					        },
 | 
				
			||||||
        "msgtype": "m.sticker",
 | 
					        "msgtype": "m.sticker",
 | 
				
			||||||
 | 
				
			|||||||
@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        print(f".. using existing upload")
 | 
					        print(f".. using existing upload")
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        image_data, width, height = util.convert_image(image_data)
 | 
					        image_data, mimetype, width, height = util.convert_sticker(image_data)
 | 
				
			||||||
        print(".", end="", flush=True)
 | 
					        print(".", end="", flush=True)
 | 
				
			||||||
        mxc = await matrix.upload(image_data, "image/png", file)
 | 
					        mxc = await matrix.upload(image_data, mimetype, file)
 | 
				
			||||||
        print(".", end="", flush=True)
 | 
					        print(".", end="", flush=True)
 | 
				
			||||||
        sticker = util.make_sticker(mxc, width, height, len(image_data), name)
 | 
					        sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
 | 
				
			||||||
        sticker["id"] = sticker_id
 | 
					        sticker["id"] = sticker_id
 | 
				
			||||||
        print(" uploaded", flush=True)
 | 
					        print(" uploaded", flush=True)
 | 
				
			||||||
    return sticker
 | 
					    return sticker
 | 
				
			||||||
 | 
				
			|||||||
@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
 | 
				
			|||||||
    print(f"Reuploading {document.id}", end="", flush=True)
 | 
					    print(f"Reuploading {document.id}", end="", flush=True)
 | 
				
			||||||
    data = await client.download_media(document, file=bytes)
 | 
					    data = await client.download_media(document, file=bytes)
 | 
				
			||||||
    print(".", end="", flush=True)
 | 
					    print(".", end="", flush=True)
 | 
				
			||||||
    data, width, height = util.convert_image(data)
 | 
					    data, mimetype, width, height = util.convert_sticker(data)
 | 
				
			||||||
    print(".", end="", flush=True)
 | 
					    print(".", end="", flush=True)
 | 
				
			||||||
    mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
 | 
					    mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
 | 
				
			||||||
    print(".", flush=True)
 | 
					    print(".", flush=True)
 | 
				
			||||||
    return util.make_sticker(mxc, width, height, len(data))
 | 
					    return util.make_sticker(mxc, width, height, len(data), mimetype)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
 | 
					def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
 | 
				
			||||||
@ -81,7 +81,11 @@ async def reupload_pack(client: TelegramClient, pack: StickerSetFull, output_dir
 | 
				
			|||||||
            reuploaded_documents[document.id] = already_uploaded[document.id]
 | 
					            reuploaded_documents[document.id] = already_uploaded[document.id]
 | 
				
			||||||
            print(f"Skipped reuploading {document.id}")
 | 
					            print(f"Skipped reuploading {document.id}")
 | 
				
			||||||
        except KeyError:
 | 
					        except KeyError:
 | 
				
			||||||
            reuploaded_documents[document.id] = await reupload_document(client, document)
 | 
					            try:
 | 
				
			||||||
 | 
					                reuploaded_documents[document.id] = await reupload_document(client, document)
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                print(f"Failed to reupload {document.id}: {e}")
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
        # Always ensure the body and telegram metadata is correct
 | 
					        # Always ensure the body and telegram metadata is correct
 | 
				
			||||||
        add_meta(document, reuploaded_documents[document.id], pack)
 | 
					        add_meta(document, reuploaded_documents[document.id], pack)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user