yama/testsuite/helpers/__init__.py

223 lines
6.5 KiB
Python

import os.path
import unittest
from hashlib import sha256
from pathlib import Path
from random import Random
from typing import Tuple, Union
import attr
from immutabledict import immutabledict
unittest.util._MAX_LENGTH = 8000
MIN_FILE_SIZE = 0
MAX_FILE_SIZE = 32 * 1024 * 1024
CHUNK_SIZE = 4096
ALPHABET = "abcdefghijklmnopqrstuvwxyz0123456789"
@attr.s(auto_attribs=True, frozen=True)
class FileDescriptor:
sha256_sum: str
mtime_ms: int
permissions: int
owner: int
group: int
def ignore_metadata(self) -> "FileDescriptor":
return FileDescriptor(
sha256_sum=self.sha256_sum, mtime_ms=0, permissions=0, owner=0, group=0
)
@attr.s(auto_attribs=True, frozen=True)
class DirectoryDescriptor:
contents: immutabledict[str, Union[FileDescriptor, "DirectoryDescriptor"]]
mtime_ms: int
permissions: int
owner: int
group: int
def ignore_metadata(self) -> "DirectoryDescriptor":
return DirectoryDescriptor(
immutabledict({k: v.ignore_metadata() for k, v in self.contents.items()}),
mtime_ms=0,
permissions=0,
owner=0,
group=0,
)
def print(self, prefix: str = ""):
for key, value in self.contents.items():
if isinstance(value, DirectoryDescriptor):
print(prefix + key + "/")
value.print(prefix + " ")
else:
print(prefix + key)
def generate_random_file(rng: Random, path: Path) -> FileDescriptor:
"""
Generates a random file at the given path, and returns its descriptor.
:param rng: PRNG to use
:param path: path to use
:return: sha256 hex string
"""
file_size = rng.randint(MIN_FILE_SIZE, MAX_FILE_SIZE)
bytes_to_gen = file_size
sha256_hasher = sha256()
with path.open("wb") as file:
while bytes_to_gen > CHUNK_SIZE:
next_bytes = rng.randbytes(CHUNK_SIZE)
file.write(next_bytes)
sha256_hasher.update(next_bytes)
bytes_to_gen -= CHUNK_SIZE
next_bytes = rng.randbytes(bytes_to_gen)
file.write(next_bytes)
sha256_hasher.update(next_bytes)
final_sha256 = sha256_hasher.hexdigest()
file_stat = os.stat(path)
return FileDescriptor(
final_sha256,
file_stat.st_mtime_ns // 1000000,
file_stat.st_mode,
file_stat.st_uid,
file_stat.st_gid,
)
def generate_random_dir(
rng: Random, path: Path, max_remaining_files: int, min_files: int = 0
) -> Tuple[DirectoryDescriptor, int]:
"""
Generates a random directory at the given path, and returns its descriptor
(and the remaining number of files allowed).
:param rng: PRNG to use
:param path: path to use
:param max_remaining_files: The maximum number of files allowed.
:param min_files: The minimum number of files in this directory directly.
:return: (descriptor, number of files allowed remaining)
"""
os.mkdir(path)
num_files = rng.randint(min_files, max_remaining_files)
max_remaining_files -= num_files
contents = dict()
for _ in range(num_files):
filename_len = rng.randint(4, 16)
filename = "".join(rng.choice(ALPHABET) for _ in range(filename_len))
filepath = path.joinpath(filename)
is_file = rng.choice((True, False))
if is_file:
contents[filename] = generate_random_file(rng, filepath)
else:
contents[filename], max_remaining_files = generate_random_dir(
rng, filepath, max_remaining_files
)
file_stat = os.stat(path)
return (
DirectoryDescriptor(
immutabledict(contents),
file_stat.st_mtime_ns // 1000000,
file_stat.st_mode,
file_stat.st_uid,
file_stat.st_gid,
),
max_remaining_files,
)
def scan_file(path: Path) -> FileDescriptor:
sha256_hasher = sha256()
with path.open("rb") as file:
while True:
chunk = file.read(4096)
if not chunk:
break
sha256_hasher.update(chunk)
final_sha256 = sha256_hasher.hexdigest()
file_stat = os.stat(path)
return FileDescriptor(
final_sha256,
file_stat.st_mtime_ns // 1000000,
file_stat.st_mode,
file_stat.st_uid,
file_stat.st_gid,
)
def scan_dir(path: Path) -> DirectoryDescriptor:
contents = dict()
for entry in os.scandir(path):
name = entry.name
if name in (".", ".."):
continue
filepath = path.joinpath(name)
if filepath.is_dir():
contents[name] = scan_dir(filepath)
elif filepath.is_file():
contents[name] = scan_file(filepath)
else:
raise NotImplementedError(f"{filepath}")
file_stat = os.stat(path)
return DirectoryDescriptor(
immutabledict(contents),
file_stat.st_mtime_ns // 1000000,
file_stat.st_mode,
file_stat.st_uid,
file_stat.st_gid,
)
def randomly_mutate_file_in_descriptor(
descriptor: FileDescriptor, path: Path, random: Random
) -> None:
with path.open("r+b") as file:
length_of_file = path.stat().st_size
mutations_to_make = random.randint(1, 3)
for _ in range(mutations_to_make):
mutate_at_position = random.randint(0, length_of_file - 4)
replace_with = random.randbytes(4)
file.seek(mutate_at_position)
file.write(replace_with)
def randomly_mutate_directory_in_descriptor(
descriptor: DirectoryDescriptor, path: Path, rng: Random
) -> None:
for name, value in descriptor.contents.items():
if rng.random() < 0.1:
# just delete this (with low 10% chance)
continue
elif isinstance(value, FileDescriptor):
if rng.random() < 0.6:
randomly_mutate_file_in_descriptor(value, path.joinpath(name), rng)
else:
assert isinstance(value, DirectoryDescriptor)
randomly_mutate_directory_in_descriptor(value, path.joinpath(name), rng)
# introduce some new files, maybe.
new_files_to_introduce = max(0, rng.randint(-3, 3))
for _ in range(new_files_to_introduce):
filename_len = rng.randint(4, 16)
filename = "".join(rng.choice(ALPHABET) for _ in range(filename_len))
filepath = path.joinpath(filename)
if rng.random() < 0.8:
generate_random_file(rng, filepath)
else:
generate_random_dir(rng, filepath, 5)