223 lines
6.5 KiB
Python
223 lines
6.5 KiB
Python
import os.path
|
|
import unittest
|
|
from hashlib import sha256
|
|
from pathlib import Path
|
|
from random import Random
|
|
from typing import Tuple, Union
|
|
|
|
import attr
|
|
from immutabledict import immutabledict
|
|
|
|
unittest.util._MAX_LENGTH = 8000
|
|
|
|
MIN_FILE_SIZE = 0
|
|
MAX_FILE_SIZE = 32 * 1024 * 1024
|
|
|
|
CHUNK_SIZE = 4096
|
|
|
|
ALPHABET = "abcdefghijklmnopqrstuvwxyz0123456789"
|
|
|
|
|
|
@attr.s(auto_attribs=True, frozen=True)
|
|
class FileDescriptor:
|
|
sha256_sum: str
|
|
mtime_ms: int
|
|
permissions: int
|
|
owner: int
|
|
group: int
|
|
|
|
def ignore_metadata(self) -> "FileDescriptor":
|
|
return FileDescriptor(
|
|
sha256_sum=self.sha256_sum, mtime_ms=0, permissions=0, owner=0, group=0
|
|
)
|
|
|
|
|
|
@attr.s(auto_attribs=True, frozen=True)
|
|
class DirectoryDescriptor:
|
|
contents: immutabledict[str, Union[FileDescriptor, "DirectoryDescriptor"]]
|
|
mtime_ms: int
|
|
permissions: int
|
|
owner: int
|
|
group: int
|
|
|
|
def ignore_metadata(self) -> "DirectoryDescriptor":
|
|
return DirectoryDescriptor(
|
|
immutabledict({k: v.ignore_metadata() for k, v in self.contents.items()}),
|
|
mtime_ms=0,
|
|
permissions=0,
|
|
owner=0,
|
|
group=0,
|
|
)
|
|
|
|
def print(self, prefix: str = ""):
|
|
for key, value in self.contents.items():
|
|
if isinstance(value, DirectoryDescriptor):
|
|
print(prefix + key + "/")
|
|
value.print(prefix + " ")
|
|
else:
|
|
print(prefix + key)
|
|
|
|
|
|
def generate_random_file(rng: Random, path: Path) -> FileDescriptor:
|
|
"""
|
|
Generates a random file at the given path, and returns its descriptor.
|
|
:param rng: PRNG to use
|
|
:param path: path to use
|
|
:return: sha256 hex string
|
|
"""
|
|
file_size = rng.randint(MIN_FILE_SIZE, MAX_FILE_SIZE)
|
|
|
|
bytes_to_gen = file_size
|
|
|
|
sha256_hasher = sha256()
|
|
|
|
with path.open("wb") as file:
|
|
while bytes_to_gen > CHUNK_SIZE:
|
|
next_bytes = rng.randbytes(CHUNK_SIZE)
|
|
file.write(next_bytes)
|
|
sha256_hasher.update(next_bytes)
|
|
bytes_to_gen -= CHUNK_SIZE
|
|
next_bytes = rng.randbytes(bytes_to_gen)
|
|
file.write(next_bytes)
|
|
sha256_hasher.update(next_bytes)
|
|
|
|
final_sha256 = sha256_hasher.hexdigest()
|
|
file_stat = os.stat(path)
|
|
|
|
return FileDescriptor(
|
|
final_sha256,
|
|
file_stat.st_mtime_ns // 1000000,
|
|
file_stat.st_mode,
|
|
file_stat.st_uid,
|
|
file_stat.st_gid,
|
|
)
|
|
|
|
|
|
def generate_random_dir(
|
|
rng: Random, path: Path, max_remaining_files: int, min_files: int = 0
|
|
) -> Tuple[DirectoryDescriptor, int]:
|
|
"""
|
|
Generates a random directory at the given path, and returns its descriptor
|
|
(and the remaining number of files allowed).
|
|
:param rng: PRNG to use
|
|
:param path: path to use
|
|
:param max_remaining_files: The maximum number of files allowed.
|
|
:param min_files: The minimum number of files in this directory directly.
|
|
:return: (descriptor, number of files allowed remaining)
|
|
"""
|
|
|
|
os.mkdir(path)
|
|
|
|
num_files = rng.randint(min_files, max_remaining_files)
|
|
max_remaining_files -= num_files
|
|
|
|
contents = dict()
|
|
|
|
for _ in range(num_files):
|
|
filename_len = rng.randint(4, 16)
|
|
filename = "".join(rng.choice(ALPHABET) for _ in range(filename_len))
|
|
filepath = path.joinpath(filename)
|
|
is_file = rng.choice((True, False))
|
|
if is_file:
|
|
contents[filename] = generate_random_file(rng, filepath)
|
|
else:
|
|
contents[filename], max_remaining_files = generate_random_dir(
|
|
rng, filepath, max_remaining_files
|
|
)
|
|
|
|
file_stat = os.stat(path)
|
|
|
|
return (
|
|
DirectoryDescriptor(
|
|
immutabledict(contents),
|
|
file_stat.st_mtime_ns // 1000000,
|
|
file_stat.st_mode,
|
|
file_stat.st_uid,
|
|
file_stat.st_gid,
|
|
),
|
|
max_remaining_files,
|
|
)
|
|
|
|
|
|
def scan_file(path: Path) -> FileDescriptor:
|
|
sha256_hasher = sha256()
|
|
with path.open("rb") as file:
|
|
while True:
|
|
chunk = file.read(4096)
|
|
if not chunk:
|
|
break
|
|
sha256_hasher.update(chunk)
|
|
|
|
final_sha256 = sha256_hasher.hexdigest()
|
|
file_stat = os.stat(path)
|
|
return FileDescriptor(
|
|
final_sha256,
|
|
file_stat.st_mtime_ns // 1000000,
|
|
file_stat.st_mode,
|
|
file_stat.st_uid,
|
|
file_stat.st_gid,
|
|
)
|
|
|
|
|
|
def scan_dir(path: Path) -> DirectoryDescriptor:
|
|
contents = dict()
|
|
for entry in os.scandir(path):
|
|
name = entry.name
|
|
if name in (".", ".."):
|
|
continue
|
|
filepath = path.joinpath(name)
|
|
if filepath.is_dir():
|
|
contents[name] = scan_dir(filepath)
|
|
elif filepath.is_file():
|
|
contents[name] = scan_file(filepath)
|
|
else:
|
|
raise NotImplementedError(f"{filepath}")
|
|
|
|
file_stat = os.stat(path)
|
|
return DirectoryDescriptor(
|
|
immutabledict(contents),
|
|
file_stat.st_mtime_ns // 1000000,
|
|
file_stat.st_mode,
|
|
file_stat.st_uid,
|
|
file_stat.st_gid,
|
|
)
|
|
|
|
|
|
def randomly_mutate_file_in_descriptor(
|
|
descriptor: FileDescriptor, path: Path, random: Random
|
|
) -> None:
|
|
with path.open("r+b") as file:
|
|
length_of_file = path.stat().st_size
|
|
mutations_to_make = random.randint(1, 3)
|
|
for _ in range(mutations_to_make):
|
|
mutate_at_position = random.randint(0, length_of_file - 4)
|
|
replace_with = random.randbytes(4)
|
|
file.seek(mutate_at_position)
|
|
file.write(replace_with)
|
|
|
|
|
|
def randomly_mutate_directory_in_descriptor(
|
|
descriptor: DirectoryDescriptor, path: Path, rng: Random
|
|
) -> None:
|
|
for name, value in descriptor.contents.items():
|
|
if rng.random() < 0.1:
|
|
# just delete this (with low 10% chance)
|
|
continue
|
|
elif isinstance(value, FileDescriptor):
|
|
if rng.random() < 0.6:
|
|
randomly_mutate_file_in_descriptor(value, path.joinpath(name), rng)
|
|
else:
|
|
assert isinstance(value, DirectoryDescriptor)
|
|
randomly_mutate_directory_in_descriptor(value, path.joinpath(name), rng)
|
|
|
|
# introduce some new files, maybe.
|
|
new_files_to_introduce = max(0, rng.randint(-3, 3))
|
|
for _ in range(new_files_to_introduce):
|
|
filename_len = rng.randint(4, 16)
|
|
filename = "".join(rng.choice(ALPHABET) for _ in range(filename_len))
|
|
filepath = path.joinpath(filename)
|
|
if rng.random() < 0.8:
|
|
generate_random_file(rng, filepath)
|
|
else:
|
|
generate_random_dir(rng, filepath, 5)
|