commit 328c99924c8ba41bf756ab4975654e44cfdd6709 Author: Olivier Date: Thu Sep 24 18:53:31 2020 +0100 Initial commit before I start messing around diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..9f4616d --- /dev/null +++ b/mypy.ini @@ -0,0 +1,23 @@ +[mypy] +check_untyped_defs = True + +[mypy-cbor2] +ignore_missing_imports = True + +[mypy-toposort] +ignore_missing_imports = True + +[mypy-asyncssh] +ignore_missing_imports = True + +[mypy-nacl.*] +ignore_missing_imports = True + +[mypy-secretstorage] +ignore_missing_imports = True + +[mypy-canonicaljson] +ignore_missing_imports = True + +[mypy-asyncpg] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d6e1198 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/scone/__init__.py b/scone/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/__main__.py b/scone/__main__.py new file mode 100644 index 0000000..c12ff05 --- /dev/null +++ b/scone/__main__.py @@ -0,0 +1,55 @@ +# import asyncio +# import itertools +# import sys +# from typing import List +# +# from scone.head import Head, Recipe +# from scone.head.kitchen import Kitchen +# from scone.head.recipe import Preparation + +# def main(args=None): +# if args is None: +# args = sys.argv[1:] +# +# if len(args) < 1: +# raise RuntimeError("Needs to be passed a sous config directory as 1st arg!") +# +# print("Am I a head?") +# +# head = Head.open(args[0]) +# +# print(head.debug_info()) +# +# recipes_by_sous = head.construct_recipes() +# +# all_recipes: List[Recipe] = list( +# itertools.chain.from_iterable(recipes_by_sous.values()) +# ) +# +# prepare = Preparation(all_recipes) +# order = prepare.prepare(head) +# +# for epoch, items in enumerate(order): +# print(f"----- Course {epoch} -----") +# +# for item in items: +# if isinstance(item, Recipe): +# print(f" > recipe {item}") +# elif isinstance(item, tuple): +# kind, ident, extra = item +# print(f" - we now have {kind} {ident} {dict(extra)}") +# +# print("Starting run") +# +# k = Kitchen(head) +# +# async def cook(): +# for epoch, epoch_items in enumerate(order): +# print(f"Cooking Course {epoch} of {len(order)}") +# await k.run_epoch(epoch_items) +# +# asyncio.get_event_loop().run_until_complete(cook()) +# +# +# if __name__ == "__main__": +# main() diff --git a/scone/common/__init__.py b/scone/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/common/chanpro.py b/scone/common/chanpro.py new file mode 100644 index 0000000..6b97afc --- /dev/null +++ b/scone/common/chanpro.py @@ -0,0 +1,172 @@ +import asyncio +import logging +import struct +import sys +from asyncio import Queue, Task +from asyncio.streams import FlowControlMixin, StreamReader, StreamWriter +from typing import Any, Dict, Optional + +import attr +import cattr +import cbor2 + +SIZE_FORMAT = "!I" +logger = logging.getLogger(__name__) + + +class ChanPro: + def __init__(self, in_stream: StreamReader, out_stream: StreamWriter): + self._in = in_stream + self._out = out_stream + self._channels: Dict[int, "Channel"] = {} + self._listener: Optional[Task] = None + + async def close(self) -> None: + # TODO cancel _listener? + pass + + @staticmethod + async def open_from_stdio() -> "ChanPro": + loop = asyncio.get_event_loop() + reader = asyncio.StreamReader() + reader_protocol = asyncio.StreamReaderProtocol(reader) + await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer) + + writer_transport, writer_protocol = await loop.connect_write_pipe( + FlowControlMixin, sys.stdout.buffer + ) + writer = StreamWriter(writer_transport, writer_protocol, None, loop) + + return ChanPro(reader, writer) + + async def _send_dict(self, dictionary: dict): + encoded = cbor2.dumps(dictionary) + encoded_len = struct.pack(SIZE_FORMAT, len(encoded)) + self._out.write(encoded_len) + self._out.write(encoded) + await self._out.drain() + + async def _recv_dict(self) -> dict: + size = struct.calcsize(SIZE_FORMAT) + encoded_len = await self._in.readexactly(size) + (length,) = struct.unpack(SIZE_FORMAT, encoded_len) + encoded = await self._in.readexactly(length) + return cbor2.loads(encoded) + + def new_channel(self, number: int, desc: str): + if number in self._channels: + channel = self._channels[number] + raise ValueError(f"Channel {number} already in use ({channel}).") + channel = Channel(number, desc, self) + self._channels[number] = channel + return channel + + async def send_message(self, channel: int, payload: Any): + await self._send_dict({"c": channel, "p": payload}) + + async def send_close(self, channel: int, reason: str = None): + # TODO extend with error indication capability (remote throw) ? + # TODO might want to wait until other end closed? + await self._send_dict({"c": channel, "close": True, "reason": reason}) + + def start_listening_to_channels(self, default_route: Optional["Channel"]): + async def channel_listener(): + idx = 0 + while True: + message = await self._recv_dict() + logger.debug(" %d %r", idx, message) + idx += 1 + await self.handle_incoming_message(message, default_route=default_route) + + self._listener = asyncio.create_task( + channel_listener() # py 3.8 , name="chanpro channel listener" + ) + + async def handle_incoming_message( + self, message: dict, default_route: Optional["Channel"] = None + ): + if "c" not in message: + logger.warning("Received message without channel number.") + channel_num = message["c"] + channel = self._channels.get(channel_num) + if not channel: + if default_route: + await default_route._queue.put({"lost": message}) + else: + logger.warning( + "Received message about non-existent channel number %r.", + channel_num, + ) + return + # XXX todo send msg, what about shutdown too? + if "p" in message: + # payload on channel + await channel._queue.put(message["p"]) + elif "close" in message: + channel._closed = True + await channel._queue.put(None) + else: + raise ValueError(f"Unknown channel message with keys {message.keys()}") + + +class Channel: + def __init__(self, number: int, desc: str, chanpro: ChanPro): + self.number = number + self.description = desc + self.chanpro = chanpro + self._queue: Queue[Any] = Queue() + self._closed = False + + def __str__(self): + return f"Channel №{self.number} ({self.description})" + + async def send(self, payload: Any): + if attr.has(payload.__class__): + payload = cattr.unstructure(payload) + await self.chanpro.send_message(self.number, payload) + + async def recv(self) -> Any: + if self._queue.empty() and self._closed: + raise EOFError("Channel closed.") + item = await self._queue.get() + if item is None and self._queue.empty() and self._closed: + raise EOFError("Channel closed.") + return item + + async def close(self, reason: str = None): + if not self._closed: + self._closed = True + await self._queue.put(None) + await self.chanpro.send_close(self.number, reason) + + async def wait_close(self): + try: + await self.recv() + raise RuntimeError("Message arrived when expecting closure.") + except EOFError: + # expected + return + + async def consume(self) -> Any: + """ + Consume the last item of the channel and assert closure. + The last item is returned. + """ + item = await self.recv() + await self.wait_close() + return item + + +class ChanProHead: + def __init__(self, chanpro: ChanPro, channel0: Channel): + self._chanpro = chanpro + self._channel0 = channel0 + self._next_channel_id = 1 + + async def start_command_channel(self, command: str, payload: Any) -> Channel: + new_channel = self._chanpro.new_channel(self._next_channel_id, command) + self._next_channel_id += 1 + await self._channel0.send( + {"nc": new_channel.number, "cmd": command, "pay": payload} + ) + return new_channel diff --git a/scone/common/loader.py b/scone/common/loader.py new file mode 100644 index 0000000..af5c186 --- /dev/null +++ b/scone/common/loader.py @@ -0,0 +1,45 @@ +import importlib +import pkgutil +from inspect import isclass +from typing import Any, Callable, Dict, Generic, Optional, TypeVar + +T = TypeVar("T") + + +class ClassLoader(Generic[T]): + def __init__(self, clarse: Any, name_getter: Callable[[Any], Optional[str]]): + self._class = clarse + self._classes: Dict[str, Callable[[str, str, dict], T]] = dict() + self._name_getter = name_getter + + def add_package_root(self, module_root: str): + module = importlib.import_module(module_root) + self._add_module(module) + + # find subpackages + for mod in pkgutil.iter_modules(module.__path__): # type: ignore + if mod.ispkg: + self.add_package_root(module_root + "." + mod.name) + else: + submodule = importlib.import_module(module_root + "." + mod.name) + self._add_module(submodule) + + def _add_module(self, module): + # find recipes + for name in dir(module): + item = getattr(module, name) + if isclass(item) and issubclass(item, self._class): + reg_name = self._name_getter(item) + if reg_name is not None: + self._classes[reg_name] = item + + def get_class(self, name: str): + return self._classes.get(name) + + def __str__(self) -> str: + lines = ["Generic Loader. Loaded stuff:"] + + for recipe_name, recipe_class in self._classes.items(): + lines.append(f" - {recipe_name} from {recipe_class.__module__}") + + return "\n".join(lines) diff --git a/scone/common/misc.py b/scone/common/misc.py new file mode 100644 index 0000000..f434984 --- /dev/null +++ b/scone/common/misc.py @@ -0,0 +1,42 @@ +import os +import sys +from hashlib import sha256 + + +def eprint(*args, **kwargs): + kwargs["file"] = sys.stderr + print(*args, **kwargs) + + +def sha256_dir(path: str) -> str: + items = {} + with os.scandir(path) as scandir: + for dir_entry in scandir: + if dir_entry.is_dir(): + items[dir_entry.name] = sha256_dir(dir_entry.path) + else: + items[dir_entry.name] = sha256_file(dir_entry.path) + items_sorted = list(items.items()) + items_sorted.sort() + buf = b"" + for fname, fhash in items_sorted: + buf += fname.encode() + buf += b"\0" + buf += fhash.encode() + buf += b"\0" + return sha256_bytes(buf) + + +def sha256_file(path: str) -> str: + hasher = sha256(b"") + with open(path, "rb") as fread: + while True: + data = fread.read(8192 * 1024) + if not data: + break + hasher.update(data) + return hasher.hexdigest() + + +def sha256_bytes(data: bytes) -> str: + return sha256(data).hexdigest() diff --git a/scone/common/modeutils.py b/scone/common/modeutils.py new file mode 100644 index 0000000..268f226 --- /dev/null +++ b/scone/common/modeutils.py @@ -0,0 +1,62 @@ +import re +from typing import Union + +# Opinionated default modes for personal use. +# Security conscious but also reasonable. +DEFAULT_MODE_FILE = 0o660 +DEFAULT_MODE_DIR = 0o775 + + +def parse_mode(mode_code: Union[str, int], directory: bool) -> int: + look_up = {"r": 0o4, "w": 0o2, "x": 0o1} + mults = {"u": 0o100, "g": 0o010, "o": 0o001, "a": 0o111} + mode = 0 + + if isinstance(mode_code, int): + return mode_code + + pieces = mode_code.split(",") + + for piece in pieces: + piecebits = 0 + match = re.fullmatch( + r"(?P[ugoa]+)(?P[-+=])(?P[rwxXst]*)", piece + ) + if match is None: + raise ValueError(f"Did not understand mode string {piece}") + affected = set(match.group("affected")) + op = match.group("op") + values = set(match.group("value")) + if "X" in values: + values.remove("X") + if directory: + values.add("x") + + mult = 0 + for affectee in affected: + mult |= mults[affectee] + + for value in values: + if value in ("r", "w", "x"): + piecebits |= look_up[value] * mult + elif value == "s": + if "u" in affected: + piecebits |= 0o4000 + if "g" in affected: + piecebits |= 0o2000 + elif value == "t": + piecebits |= 0o1000 + + if op == "=": + # OR with piecebits allows setting suid, sgid and sticky. + mask = (mult * 0o7) | piecebits + mode &= ~mask + mode |= piecebits + elif op == "+": + mode |= piecebits + elif op == "-": + mode &= ~piecebits + else: + raise RuntimeError("op not [-+=].") + + return mode diff --git a/scone/common/pools.py b/scone/common/pools.py new file mode 100644 index 0000000..ab992ed --- /dev/null +++ b/scone/common/pools.py @@ -0,0 +1,20 @@ +from concurrent.futures.process import ProcessPoolExecutor +from concurrent.futures.thread import ThreadPoolExecutor + + +class Pools: + _instance = None + + def __init__(self): + self.threaded = ThreadPoolExecutor() + self.process = ProcessPoolExecutor() + + @staticmethod + def get(): + if not Pools._instance: + Pools._instance = Pools() + return Pools._instance + + def shutdown(self): + self.threaded.shutdown() + self.process.shutdown() diff --git a/scone/default/__init__.py b/scone/default/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/default/recipes/__init__.py b/scone/default/recipes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/default/recipes/apt.py b/scone/default/recipes/apt.py new file mode 100644 index 0000000..8b50f22 --- /dev/null +++ b/scone/default/recipes/apt.py @@ -0,0 +1,88 @@ +from typing import Dict, List, Set, Tuple + +from scone.default.utensils.basic_utensils import SimpleExec +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type + + +class AptInstallInternal(Recipe): + """ + Actually installs the packages; does it in a single batch for efficiency! + """ + + _NAME = "apt-install.internal" + + # TODO(extension, low): expand this into apt-install-now if we need + # the flexibility + + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + super().__init__(host, slug, args, head) + + self.packages: Set[str] = set() + + args["packages"] = self.packages + args[".source"] = ("@virtual", "apt-install-internal", "the one true AII") + + def get_user(self, head: "Head") -> str: + return "root" + + def prepare(self, preparation: Preparation, head: "Head") -> None: + super().prepare(preparation, head) + preparation.needs("apt-stage", "internal-install-packages") + preparation.needs("apt-stage", "repositories-declared") + preparation.provides("apt-stage", "packages-installed") + + async def cook(self, kitchen: Kitchen) -> None: + # apt-installs built up the args to represent what was needed, so this + # will work as-is + kitchen.get_dependency_tracker() + + if self.packages: + update = await kitchen.ut1areq( + SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result + ) + if update.exit_code != 0: + raise RuntimeError( + f"apt update failed with err {update.exit_code}: {update.stderr!r}" + ) + + install_args = ["apt-get", "-yq", "install"] + install_args += list(self.packages) + install = await kitchen.ut1areq( + SimpleExec(install_args, "/"), SimpleExec.Result + ) + + if install.exit_code != 0: + raise RuntimeError( + f"apt install failed with err {install.exit_code}:" + f" {install.stderr!r}" + ) + + +class AptPackage(Recipe): + _NAME = "apt-install" + + internal_installers: Dict[Tuple[Head, str], AptInstallInternal] = {} + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + self.packages: List[str] = check_type(args["packages"], list) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + pair = (head, self.get_host()) + if pair not in AptPackage.internal_installers: + install_internal = AptInstallInternal(self.get_host(), "internal", {}, head) + AptPackage.internal_installers[pair] = install_internal + preparation.subrecipe(install_internal) + preparation.provides("apt-stage", "internal-install-packages") + + internal_installer = AptPackage.internal_installers.get(pair) + assert internal_installer is not None + internal_installer.packages.update(self.packages) + + async def cook(self, kitchen: Kitchen) -> None: + # can't be tracked + kitchen.get_dependency_tracker().ignore() diff --git a/scone/default/recipes/filesystem.py b/scone/default/recipes/filesystem.py new file mode 100644 index 0000000..0750118 --- /dev/null +++ b/scone/default/recipes/filesystem.py @@ -0,0 +1,322 @@ +from pathlib import Path +from typing import List + +from scone.common.modeutils import DEFAULT_MODE_DIR, parse_mode +from scone.default.steps.basic_steps import exec_no_fails +from scone.default.steps.filesystem_steps import depend_remote_file +from scone.default.utensils.basic_utensils import ( + Chmod, + Chown, + MakeDirectory, + SimpleExec, + Stat, +) +from scone.default.utensils.dynamic_dependencies import HasChangedInSousStore +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type, check_type_opt + + +class DeclareFile(Recipe): + """ + Declares that a file already exists on the sous. + + Maybe we will assert it in the future? + """ + + _NAME = "declare-file" + + def prepare(self, preparation: Preparation, head: Head): + preparation.provides("file", self._args["path"]) + + async def cook(self, kitchen: Kitchen): + # mark as tracked. + kitchen.get_dependency_tracker() + + +class DeclareDirectory(Recipe): + """ + Declares that a directory already exists on the sous. + + Maybe we will assert it in the future? + """ + + _NAME = "declare-dir" + + def prepare(self, preparation: Preparation, head: Head): + preparation.provides("directory", self._args["path"]) + + async def cook(self, kitchen: Kitchen): + # mark as tracked. + kitchen.get_dependency_tracker() + + +class EnsureDirectory(Recipe): + """ + Makes a directory tree. + """ + + _NAME = "directory" + + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + super().__init__(host, slug, args, head) + parents = args.get("parents", 0) + assert isinstance(parents, int) + + path = args.get("path") + assert isinstance(path, str) + + mode = args.get("mode", DEFAULT_MODE_DIR) + assert isinstance(mode, str) or isinstance(mode, int) + + self.path = path + self.parents = parents + self.mode = parse_mode(mode, directory=True) + self._make: List[str] = [] + self.targ_user = args.get("owner", self.get_user(head)) + self.targ_group = args.get("group", self.targ_user) + + def prepare(self, preparation: Preparation, head: "Head"): + super().prepare(preparation, head) + preparation.needs("os-user", self.targ_user) + preparation.needs("os-group", self.targ_group) + preparation.provides("directory", self.path) + self._make.append(self.path) + parent = Path(self.path).parent + for _ in range(self.parents): + self._make.append(str(parent)) + preparation.provides("directory", str(parent)) + parent = parent.parent + preparation.needs("directory", str(parent)) + self._make.reverse() + + async def cook(self, k: Kitchen): + for directory in self._make: + stat = await k.ut1a(Stat(directory), Stat.Result) + if stat is None: + # doesn't exist, make it + await k.ut0(MakeDirectory(directory, self.mode)) + + stat = await k.ut1a(Stat(directory), Stat.Result) + if stat is None: + raise RuntimeError("Directory vanished after creation!") + + if stat.dir: + if (stat.user, stat.group) != (self.targ_user, self.targ_group): + # need to chown + await k.ut0(Chown(directory, self.targ_user, self.targ_group)) + + if stat.mode != self.mode: + await k.ut0(Chmod(directory, self.mode)) + else: + raise RuntimeError("Already exists but not a dir: " + directory) + + # mark as tracked. + k.get_dependency_tracker() + + +class ExtractTar(Recipe): + """ + Extracts a tar archive, expecting to get at least some files. + """ + + _NAME = "tar-extract" + + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + super().__init__(host, slug, args, head) + + self.tar = check_type(args.get("tar"), str) + self.dir = check_type(args.get("dir"), str) + self.expect_files = check_type(args.get("expects_files"), List[str]) + + def prepare(self, preparation: Preparation, head: "Head"): + super().prepare(preparation, head) + preparation.needs("file", self.tar) + preparation.needs("directory", self.dir) + for file in self.expect_files: + assert isinstance(file, str) + final = str(Path(self.dir, file)) + preparation.provides("file", final) + + async def cook(self, k: "Kitchen"): + res = await k.ut1areq( + SimpleExec(["tar", "xf", self.tar], self.dir), SimpleExec.Result + ) + if res.exit_code != 0: + raise RuntimeError( + f"tar failed with ec {res.exit_code}; stderr = <<<" + f"\n{res.stderr.decode()}\n>>>" + ) + + for expect_relative in self.expect_files: + expect = str(Path(self.dir, expect_relative)) + stat = await k.ut1a(Stat(expect), Stat.Result) + if stat is None: + raise RuntimeError( + f"tar succeeded but expectation failed; {expect!r} not found." + ) + + +class RunScript(Recipe): + """ + Runs a script (such as an installation script). + """ + + _NAME = "script-run" + + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + super().__init__(host, slug, args, head) + + self.working_dir = check_type(args.get("working_dir"), str) + + # relative to working dir + self.script = check_type(args.get("script"), str) + + # todo other remote dependencies + # todo provided files as a result of the script exec + + def prepare(self, preparation: Preparation, head: "Head"): + super().prepare(preparation, head) + final_script = str(Path(self.working_dir, self.script)) + preparation.needs("file", final_script) + + # TODO more needs + # TODO preparation.provides() + + async def cook(self, kitchen: "Kitchen"): + final_script = str(Path(self.working_dir, self.script)) + await depend_remote_file(final_script, kitchen) + + +class CommandOnChange(Recipe): + """ + Runs a command when at least one file listed has changed on the remote. + """ + + _NAME = "command-on-change" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.purpose = check_type(args.get("purpose"), str) + self.command = check_type(args.get("command"), list) + self.watching = check_type(args.get("files"), list) + self.working_dir = check_type(args.get("working_dir", "/"), str) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + for file in self.watching: + preparation.needs("file", file) + + async def cook(self, kitchen: Kitchen) -> None: + kitchen.get_dependency_tracker().ignore() + + changed = await kitchen.ut1(HasChangedInSousStore(self.purpose, self.watching)) + + if changed: + result = await kitchen.ut1areq( + SimpleExec(self.command, self.working_dir), SimpleExec.Result + ) + + if result.exit_code != 0: + raise RuntimeError( + f"exit code not 0 ({result.exit_code}), {result.stderr!r}" + ) + + +class GitCheckout(Recipe): + _NAME = "git" + + # TODO(correctness): branches can change (tags too), but this will still + # declare SAFE_TO_SKIP. Perhaps we want to stop that unless you opt out? + # But oh well for now. + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.repo_src = check_type(args.get("src"), str) + self.dest_dir = check_type(args.get("dest"), str) + self.ref = check_type_opt(args.get("ref"), str) + self.branch = check_type_opt(args.get("branch"), str) + + if not (self.ref or self.branch): + raise ValueError("Need to specify 'ref' or 'branch'") + + if self.ref and self.branch: + raise ValueError("Can't specify both 'ref' and 'branch'.") + + # should end with / if it's a dir + self.expect: List[str] = check_type(args.get("expect", []), list) + self.submodules = check_type(args.get("submodules", False), bool) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + parent = str(Path(self.dest_dir).parent) + preparation.needs("directory", parent) + preparation.provides("directory", self.dest_dir) + + for expected in self.expect: + expected_path_str = str(Path(self.dest_dir, expected)) + if expected.endswith("/"): + preparation.provides("directory", expected_path_str) + else: + preparation.provides("file", expected_path_str) + + async def cook(self, k: Kitchen) -> None: + # no non-arg dependencies + k.get_dependency_tracker() + + stat = await k.ut1a(Stat(self.dest_dir), Stat.Result) + if stat is None: + # doesn't exist; git init it + await exec_no_fails( + k, ["git", "init", self.dest_dir], "/" + ) + + stat = await k.ut1a(Stat(self.dest_dir), Stat.Result) + if stat is None: + raise RuntimeError("Directory vanished after creation!") + + if not stat.dir: + raise RuntimeError("Already exists but not a dir: " + self.dest_dir) + + # add the remote, removing it first to ensure it's what we want + # don't care if removing fails + await k.ut1areq(SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir), SimpleExec.Result) + await exec_no_fails( + k, ["git", "remote", "add", "scone", self.repo_src], self.dest_dir + ) + + # fetch the latest from the remote + await exec_no_fails( + k, ["git", "fetch", "scone"], self.dest_dir + ) + + # figure out what ref we want to use + # TODO(performance): fetch only this ref? + ref = self.ref or f"scone/{self.branch}" + + # switch to that ref + await exec_no_fails( + k, ["git", "switch", "--detach", ref], self.dest_dir + ) + + # if we use submodules + if self.submodules: + await exec_no_fails( + k, ["git", "submodule", "update", "--init", "--recursive"], self.dest_dir + ) + + for expected in self.expect: + expected_path_str = str(Path(self.dest_dir, expected)) + # TODO(performance, low): parallelise these + stat = await k.ut1a(Stat(expected_path_str), Stat.Result) + if not stat: + raise RuntimeError(f"expected {expected_path_str} to exist but it did not") + + if stat.dir and not expected.endswith("/"): + raise RuntimeError(f"expected {expected_path_str} to exist as a file but it is a dir") + + if not stat.dir and expected.endswith("/"): + raise RuntimeError(f"expected {expected_path_str} to exist as a dir but it is a file") diff --git a/scone/default/recipes/fridge.py b/scone/default/recipes/fridge.py new file mode 100644 index 0000000..c43ab04 --- /dev/null +++ b/scone/default/recipes/fridge.py @@ -0,0 +1,164 @@ +import asyncio +from asyncio import Future +from pathlib import Path +from typing import Dict, cast +from urllib.parse import urlparse +from urllib.request import urlretrieve + +from scone.common.misc import sha256_file +from scone.common.modeutils import DEFAULT_MODE_FILE, parse_mode +from scone.default.steps import fridge_steps +from scone.default.steps.fridge_steps import FridgeMetadata, load_and_transform, SUPERMARKET_RELATIVE +from scone.default.utensils.basic_utensils import WriteFile, Chown +from scone.head import Head +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation, Recipe +from scone.head.utils import check_type + + +class FridgeCopy(Recipe): + """ + Declares that a file should be copied from the head to the sous. + """ + + _NAME = "fridge-copy" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + fp = fridge_steps.search_in_fridge(head, args["src"]) + if fp is None: + raise ValueError(f"Cannot find {args['src']} in the fridge.") + + unextended_path_str, meta = fridge_steps.decode_fridge_extension(str(fp)) + unextended_path = Path(unextended_path_str) + + dest = args["dest"] + if not isinstance(dest, str): + raise ValueError("No destination provided or wrong type.") + + if dest.endswith("/"): + self.destination: Path = Path(args["dest"], unextended_path.parts[-1]) + else: + self.destination = Path(args["dest"]) + + mode = args.get("mode", DEFAULT_MODE_FILE) + assert isinstance(mode, str) or isinstance(mode, int) + + self.fridge_path: str = args["src"] + self.real_path: Path = fp + self.fridge_meta: FridgeMetadata = meta + self.mode = parse_mode(mode, directory=False) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + preparation.provides("file", str(self.destination)) + preparation.needs("directory", str(self.destination.parent)) + + async def cook(self, k: Kitchen) -> None: + data = await load_and_transform( + k, self.fridge_meta, self.real_path, self.get_host() + ) + dest_str = str(self.destination) + chan = await k.start(WriteFile(dest_str, self.mode)) + await chan.send(data) + await chan.send(None) + if await chan.recv() != "OK": + raise RuntimeError(f"WriteFail failed on fridge-copy to {self.destination}") + + # this is the wrong thing + # hash_of_data = sha256_bytes(data) + # k.get_dependency_tracker().register_remote_file(dest_str, hash_of_data) + + await k.get_dependency_tracker().register_fridge_file( + self.fridge_path, self.real_path + ) + + +class Supermarket(Recipe): + """ + Downloads an asset (cached if necessary) and copies to sous. + """ + + _NAME = "supermarket" + + # dict of target path → future that will complete when it's downloaded + in_progress: Dict[str, Future] = dict() + + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + super().__init__(host, slug, args, head) + self.url = args.get("url") + assert isinstance(self.url, str) + + self.sha256 = check_type(args.get("sha256"), str).lower() + + dest = args["dest"] + if not isinstance(dest, str): + raise ValueError("No destination provided or wrong type.") + + if dest.endswith("/"): + file_basename = urlparse(self.url).path.split("/")[-1] + self.destination: Path = Path(args["dest"], file_basename).resolve() + else: + self.destination = Path(args["dest"]).resolve() + + self.owner = check_type(args.get("owner", self.get_user(head)), str) + self.group = check_type(args.get("group", self.owner), str) + + mode = args.get("mode", DEFAULT_MODE_FILE) + assert isinstance(mode, str) or isinstance(mode, int) + self.mode = parse_mode(mode, directory=False) + + def prepare(self, preparation: Preparation, head: "Head"): + super().prepare(preparation, head) + preparation.provides("file", str(self.destination)) + + async def cook(self, kitchen: "Kitchen"): + # need to ensure we download only once, even in a race… + + supermarket_path = Path(kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256) + + if self.sha256 in Supermarket.in_progress: + await Supermarket.in_progress[self.sha256] + elif not supermarket_path.exists(): + note = f""" +Scone Supermarket + +This file corresponds to {self.url} + +Downloaded by {self} +""".strip() + + Supermarket.in_progress[self.sha256] = cast( + Future, + asyncio.get_running_loop().run_in_executor( + kitchen.head.pools.threaded, + self._download_file, + self.url, + str(supermarket_path), + self.sha256, + note + ), + ) + + # TODO(perf): load file in another thread + with open(supermarket_path, "r") as fin: + data = fin.read() + chan = await kitchen.start(WriteFile(str(self.destination), self.mode)) + await chan.send(data) + await chan.send(None) + if await chan.recv() != "OK": + raise RuntimeError(f"WriteFail failed on supermarket to {self.destination}") + + await kitchen.ut0(Chown(str(self.destination), self.owner, self.group)) + + @staticmethod + def _download_file(url: str, dest_path: str, check_sha256: str, note: str): + urlretrieve(url, dest_path) + real_sha256 = sha256_file(dest_path) + if real_sha256 != check_sha256: + raise RuntimeError( + f"sha256 hash mismatch {real_sha256} != {check_sha256} (wanted)" + ) + with open(dest_path + ".txt", "w") as fout: + # leave a note so we can find out what this is if we need to. + fout.write(note) diff --git a/scone/default/recipes/linux.py b/scone/default/recipes/linux.py new file mode 100644 index 0000000..eb16a56 --- /dev/null +++ b/scone/default/recipes/linux.py @@ -0,0 +1,64 @@ +import crypt +import logging +from typing import Optional + +from scone.default.steps import linux_steps +from scone.default.utensils.linux_utensils import GetPasswdEntry +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type, check_type_opt + +logger = logging.getLogger(__name__) + + +class LinuxUser(Recipe): + _NAME = "os-user" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + if slug[0] == "@": + raise ValueError("os-user should be used like [os-user.username].") + + self.user_name = slug + self.make_group = check_type(args.get("make_group", True), bool) + self.make_home = check_type(args.get("make_home", True), bool) + self.home: Optional[str] = check_type_opt(args.get("home"), str) + self.password: Optional[str] = check_type_opt(args.get("password"), str) + + def prepare(self, preparation: Preparation, head: "Head") -> None: + super().prepare(preparation, head) + preparation.provides("os-user", self.user_name) + if self.make_group: + preparation.provides("os-group", self.user_name) + + async def cook(self, kitchen: Kitchen) -> None: + # TODO(documentation): note this does not update users + # acknowledge tracking + kitchen.get_dependency_tracker() + if self.password: + password_hash: Optional[str] = crypt.crypt(self.password) + else: + password_hash = None + + pwd_entry = await kitchen.ut1a( + GetPasswdEntry(self.user_name), GetPasswdEntry.Result + ) + + if pwd_entry: + logger.warning( + "Not updating existing os-user '%s' as it exists already and " + "modifications could be dangerous in any case. Modification " + "support may be implemented in the future.", + self.user_name, + ) + else: + # create the user fresh + await linux_steps.create_linux_user( + kitchen, + self.user_name, + password_hash, + self.make_home, + self.make_group, + self.home, + ) diff --git a/scone/default/recipes/postgres.py b/scone/default/recipes/postgres.py new file mode 100644 index 0000000..4eff21e --- /dev/null +++ b/scone/default/recipes/postgres.py @@ -0,0 +1,94 @@ +from scone.default.utensils.db_utensils import PostgresTransaction +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type + + +class PostgresDatabase(Recipe): + _NAME = "pg-db" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.database_name = slug + self.owner = check_type(args.get("owner"), str) + self.encoding = args.get("encoding", "utf8") + self.collate = args.get("collate", "en_GB.utf8") + self.ctype = args.get("ctype", "en_GB.utf8") + self.template = args.get("template", "template0") + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + # todo + + async def cook(self, kitchen: Kitchen) -> None: + ch = await kitchen.start(PostgresTransaction("postgres")) + await ch.send( + ( + "SELECT 1 AS count FROM pg_catalog.pg_database WHERE datname = ?;", + self.database_name, + ) + ) + dbs = await ch.recv() + if len(dbs) > 0 and dbs[0]["count"] == 1: + await ch.send(None) + await ch.wait_close() + return + + q = f""" + CREATE DATABASE {self.database_name} + WITH OWNER {self.owner} + ENCODING {self.encoding} + LC_COLLATE {self.collate} + LC_CTYPE {self.ctype} + TEMPLATE {self.template}; + """ + + await ch.send((q,)) + res = await ch.recv() + if len(res) != 0: + raise RuntimeError("expected empty result set.") + await ch.send(None) + await ch.wait_close() + + +class PostgresUser(Recipe): + _NAME = "pg-user" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.user_name = slug + self.password = check_type(args.get("password"), str) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + # todo + + async def cook(self, kitchen: Kitchen) -> None: + ch = await kitchen.start(PostgresTransaction("postgres")) + await ch.send( + ( + "SELECT 1 AS count FROM pg_catalog.pg_user WHERE usename = ?;", + self.user_name, + ) + ) + dbs = await ch.recv() + if len(dbs) > 0 and dbs[0]["count"] == 1: + await ch.send(None) + await ch.wait_close() + return + + q = f""" + CREATE ROLE {self.user_name} + WITH PASSWORD ? + LOGIN; + """ + + await ch.send((q, self.password)) + res = await ch.recv() + if len(res) != 0: + raise RuntimeError("expected empty result set.") + await ch.send(None) + await ch.wait_close() diff --git a/scone/default/recipes/python.py b/scone/default/recipes/python.py new file mode 100644 index 0000000..1ed98a7 --- /dev/null +++ b/scone/default/recipes/python.py @@ -0,0 +1,82 @@ +from pathlib import Path +from typing import Tuple, List + +from scone.default.recipes.apt import AptPackage +from scone.default.steps.basic_steps import exec_no_fails +from scone.default.steps.filesystem_steps import depend_remote_file +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type + + +class PythonVenv(Recipe): + """ + Creates a Python virtualenv with a specified set of requirements. + + Note: using a directory as a dependency can be inefficient as dir SHA256 + will be performed to check it has not changed. + """ + + _NAME = "python-venv" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.dir = check_type(args.get("dir"), str) + self.interpreter = check_type(args.get("interpreter"), str) + # list of flags. Current supported: + # git (local git repo — track hash by git commit hash), dir, -r + self.install: List[Tuple[str, List[str]]] = [] + install_plaintext = check_type(args.get("install"), list) + for install_line in install_plaintext: + parts = install_line.split(" ") + self.install.append((parts[-1], parts[0:-1])) + + self.no_apt_install = check_type(args.get("_no_apt_install", False), bool) + + # TODO(sdists) + + def prepare(self, preparation: Preparation, head: Head): + super().prepare(preparation, head) + preparation.needs("dir", str(Path(self.dir).parent)) + + for name, flags in self.install: + if "-r" in flags: + preparation.needs("file", name) + elif "git" in flags or "dir" in flags: + preparation.needs("dir", name) + + final_script = str(Path(self.dir, "bin/python")) + preparation.provides("file", str(final_script)) + + if not self.no_apt_install: + preparation.subrecipe( + AptPackage( + self.get_host(), "@venv-apt", {"packages": ["python3-venv"]}, head + ) + ) + preparation.needs("apt-stage", "packages-installed") + + async def cook(self, kitchen: Kitchen): + dt = kitchen.get_dependency_tracker() + + await exec_no_fails( + kitchen, [self.interpreter, "-m", "venv", self.dir], "/" + ) + + install_args = [] + for name, flags in self.install: + if "-r" in flags: + install_args.append("-r") + await depend_remote_file(name, kitchen) + elif "dir" in flags or "git" in flags: + # TODO(perf, dedup): custom dynamic dependency types; git + # dependencies and sha256_dir dependencies. + dt.ignore() + + install_args.append(name) + + await exec_no_fails( + kitchen, [self.dir + "/bin/pip", "install"] + install_args, "/" + ) diff --git a/scone/default/recipes/systemd.py b/scone/default/recipes/systemd.py new file mode 100644 index 0000000..d244974 --- /dev/null +++ b/scone/default/recipes/systemd.py @@ -0,0 +1,114 @@ +from typing import Dict + +from scone.default.recipes.filesystem import CommandOnChange +from scone.default.utensils.basic_utensils import SimpleExec +from scone.head import Head, Recipe +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation +from scone.head.utils import check_type, check_type_opt + + +class SystemdUnit(Recipe): + """ + Shorthand for a system unit. Metarecipe. + """ + + _NAME = "systemd" + + daemon_reloaders: Dict[str, CommandOnChange] = {} + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.unit_name = slug if "." in slug else slug + ".service" + self.at = check_type(args.get("at"), str) + self.enabled = check_type_opt(args.get("enabled"), bool) + self.restart_on = check_type_opt(args.get("restart_on"), list) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + preparation.provides("systemd-unit", self.unit_name) + preparation.needs("systemd-stage", "daemon-reloaded") + + if self.enabled is not None: + enable_recipe = SystemdEnabled( + self.get_host(), + self.unit_name, + {"enabled": self.enabled, "at": self.at, ".user": "root"}, + head, + ) + preparation.subrecipe(enable_recipe) + preparation.needs("systemd-stage", "enabled") + + daemon_reloader = SystemdUnit.daemon_reloaders.get(self.get_host(), None) + if not daemon_reloader: + # TODO this should be replaced with a dedicated command which provides + # those units. + daemon_reloader = CommandOnChange( + self.get_host(), + "systemd-internal", + { + "purpose": "systemd.daemon_reload", + "command": ["systemctl", "daemon-reload"], + "files": [], + ".user": "root", + }, + head, + ) + preparation.subrecipe(daemon_reloader) + file_list = getattr(daemon_reloader, "_args")["files"] + file_list.append(self.at) + + if self.restart_on: + service_reloader = CommandOnChange( + self.get_host(), + "systemd-internal", + { + "purpose": "systemd.unit_reload", + "command": ["systemctl", "reload", self.unit_name], + "files": self.restart_on + [self.at], + ".user": "root", + }, + head, + ) + preparation.subrecipe(service_reloader) + + async def cook(self, kitchen: Kitchen) -> None: + # metarecipes don't do anything. + kitchen.get_dependency_tracker().ignore() + + +class SystemdEnabled(Recipe): + """ + Sets the enabled state of the systemd unit. + """ + + _NAME = "systemd-enabled" + + def __init__(self, host: str, slug: str, args: dict, head: Head): + super().__init__(host, slug, args, head) + + self.unit_name = slug if "." in slug else slug + ".service" + self.at = check_type(args.get("at"), str) + self.enabled = check_type_opt(args.get("enabled"), bool) + + def prepare(self, preparation: Preparation, head: Head) -> None: + super().prepare(preparation, head) + preparation.needs("file", self.at) + preparation.needs("systemd-stage", "daemon-reloaded") + + async def cook(self, kitchen: Kitchen) -> None: + kitchen.get_dependency_tracker() + + result = await kitchen.ut1areq( + SimpleExec( + ["systemctl", "enable" if self.enabled else "disable", self.unit_name], + "/", + ), + SimpleExec.Result, + ) + + if result.exit_code != 0: + raise RuntimeError( + f"Failed to en/disable {self.unit_name}: {result.stderr.decode()}" + ) diff --git a/scone/default/steps/__init__.py b/scone/default/steps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/default/steps/basic_steps.py b/scone/default/steps/basic_steps.py new file mode 100644 index 0000000..b128959 --- /dev/null +++ b/scone/default/steps/basic_steps.py @@ -0,0 +1,57 @@ +from pathlib import PurePath +from typing import List, Optional, Union + +from scone.default.utensils.basic_utensils import SimpleExec +from scone.head import Recipe +from scone.head.exceptions import CookingError +from scone.head.kitchen import Kitchen, current_recipe + + +class ExecutionFailure(CookingError): + """ + A command failed. + """ + + def __init__( + self, + args: List[str], + working_dir: str, + sous: str, + user: str, + result: SimpleExec.Result, + ): + stderr = result.stderr.decode().replace("\n", "\n ") + + message = ( + f"Command failed on {sous} (user {user}) in {working_dir}.\n" + f"The command was: {args}\n" + f"Stderr was:\n {stderr}" + ) + + super().__init__(message) + + +async def exec_no_fails( + kitchen: Kitchen, args: List[str], working_dir: Union[str, PurePath] +) -> SimpleExec.Result: + if not isinstance(working_dir, str): + working_dir = str(working_dir) + + result = await kitchen.start_and_consume_attrs( + SimpleExec(args, working_dir), SimpleExec.Result + ) + + if result.exit_code != 0: + recipe: Optional[Recipe] = current_recipe.get(None) # type: ignore + if recipe: + raise ExecutionFailure( + args, + working_dir, + recipe.get_host(), + recipe.get_user(kitchen.head), + result, + ) + else: + raise ExecutionFailure(args, working_dir, "???", "???", result) + + return result diff --git a/scone/default/steps/filesystem_steps.py b/scone/default/steps/filesystem_steps.py new file mode 100644 index 0000000..cfcedad --- /dev/null +++ b/scone/default/steps/filesystem_steps.py @@ -0,0 +1,7 @@ +from scone.default.utensils.basic_utensils import HashFile +from scone.head.kitchen import Kitchen + + +async def depend_remote_file(path: str, kitchen: Kitchen) -> None: + sha256 = await kitchen.ut1(HashFile(path)) + kitchen.get_dependency_tracker().register_remote_file(path, sha256) diff --git a/scone/default/steps/fridge_steps.py b/scone/default/steps/fridge_steps.py new file mode 100644 index 0000000..0027c17 --- /dev/null +++ b/scone/default/steps/fridge_steps.py @@ -0,0 +1,74 @@ +from enum import Enum +from pathlib import Path, PurePath +from typing import List, Optional, Tuple, Union + +from jinja2 import Template + +from scone.head import Head +from scone.head.kitchen import Kitchen + + +SUPERMARKET_RELATIVE = ".scone-cache/supermarket" + + +def get_fridge_dirs(head: Head) -> List[Path]: + # TODO expand with per-sous/per-group dirs? + return [Path(head.directory, "fridge")] + + +def search_in_dirlist( + dirlist: List[Path], relative: Union[str, PurePath] +) -> Optional[Path]: + for directory in dirlist: + potential_path = directory.joinpath(relative) + if potential_path.exists(): + return potential_path + return None + + +def search_in_fridge(head: Head, relative: Union[str, PurePath]) -> Optional[Path]: + fridge_dirs = get_fridge_dirs(head) + return search_in_dirlist(fridge_dirs, relative) + + +class FridgeMetadata(Enum): + FRIDGE = 0 + FROZEN = 1 + TEMPLATE = 2 + + +def decode_fridge_extension(path: str) -> Tuple[str, FridgeMetadata]: + exts = { + ".frozen": FridgeMetadata.FROZEN, + ".j2": FridgeMetadata.TEMPLATE, + # don't know if we want to support .j2.frozen, but we could in the future + } + + for ext, meta in exts.items(): + if path.endswith(ext): + return path[: -len(ext)], meta + + return path, FridgeMetadata.FRIDGE + + +async def load_and_transform( + kitchen: Kitchen, meta: FridgeMetadata, fullpath: Path, sous: str +) -> bytes: + head = kitchen.head + # TODO(perf) don't do this in async loop + with fullpath.open("rb") as file: + data = file.read() + if meta == FridgeMetadata.FROZEN: + # decrypt + if head.secret_access is None: + raise RuntimeError("Frozen file but no secret access enabled!") + data = head.secret_access.decrypt_bytes(data) + elif meta == FridgeMetadata.TEMPLATE: + # pass through Jinja2 + template = Template(data.decode()) + proxies = kitchen.get_dependency_tracker().get_j2_compatible_dep_var_proxies( + head.variables[sous] + ) + data = template.render(proxies).encode() + print("data", fullpath, data) + return data diff --git a/scone/default/steps/linux_steps.py b/scone/default/steps/linux_steps.py new file mode 100644 index 0000000..ff78aeb --- /dev/null +++ b/scone/default/steps/linux_steps.py @@ -0,0 +1,41 @@ +from typing import Optional + +from scone.default.utensils.basic_utensils import SimpleExec +from scone.head.kitchen import Kitchen + + +async def create_linux_user( + kitchen: Kitchen, + name: str, + password_hash: Optional[str], + create_home: bool = True, + create_group: bool = True, + home: Optional[str] = None, +) -> None: + args = ["useradd"] + + if password_hash: + # N.B. if you don't use a password hash, the account will be locked + # but passwordless SSH still works + args += ["-p", password_hash] + + if create_home: + args.append("-m") + + if create_group: + args.append("-U") + else: + args.append("-N") + + if home: + args += ["-d", home] + + # finally, append the user name + args.append(name) + + result = await kitchen.ut1areq(SimpleExec(args, "/"), SimpleExec.Result) + + if result.exit_code != 0: + raise RuntimeError( + "Failed to create user. Error was: " + result.stderr.strip().decode() + ) diff --git a/scone/default/utensils/__init__.py b/scone/default/utensils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scone/default/utensils/basic_utensils.py b/scone/default/utensils/basic_utensils.py new file mode 100644 index 0000000..1382b4f --- /dev/null +++ b/scone/default/utensils/basic_utensils.py @@ -0,0 +1,149 @@ +import asyncio +import grp +import logging +import os +import pwd +import shutil +import stat +from typing import List + +import attr + +from scone.common.chanpro import Channel +from scone.common.misc import sha256_file +from scone.sous.utensils import Utensil, Worktop + + +@attr.s(auto_attribs=True) +class WriteFile(Utensil): + path: str + mode: int + + async def execute(self, channel: Channel, worktop): + oldumask = os.umask(0) + fdnum = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode) + os.umask(oldumask) + + with open(fdnum, "wb") as file: + while True: + next_chunk = await channel.recv() + if next_chunk is None: + break + assert isinstance(next_chunk, bytes) + file.write(next_chunk) + + await channel.send("OK") + + +@attr.s(auto_attribs=True) +class MakeDirectory(Utensil): + path: str + mode: int + + async def execute(self, channel: Channel, worktop): + oldumask = os.umask(0) + os.mkdir(self.path, self.mode) + os.umask(oldumask) + + +@attr.s(auto_attribs=True) +class Stat(Utensil): + path: str + + logger = logging.getLogger(__name__) + + @attr.s(auto_attribs=True) + class Result: + uid: int + gid: int + dir: bool + user: str + group: str + mode: int + + async def execute(self, channel: Channel, worktop): + try: + self.logger.debug("going to stat") + stat_result = os.stat(self.path, follow_symlinks=False) + except FileNotFoundError: + self.logger.debug(":(") + await channel.send(None) + return + + self.logger.debug("going to user") + user = pwd.getpwuid(stat_result.st_uid).pw_name + self.logger.debug("going to grp") + group = grp.getgrgid(stat_result.st_gid).gr_name + self.logger.debug("going to respond") + + await channel.send( + Stat.Result( + uid=stat_result.st_uid, + gid=stat_result.st_gid, + dir=stat.S_ISDIR(stat_result.st_mode), + user=user, + group=group, + mode=stat_result.st_mode, + ) + ) + + +@attr.s(auto_attribs=True) +class Chown(Utensil): + path: str + user: str + group: str + + async def execute(self, channel: Channel, worktop): + shutil.chown(self.path, self.user, self.group) + + +@attr.s(auto_attribs=True) +class Chmod(Utensil): + path: str + mode: int + + async def execute(self, channel: Channel, worktop): + os.chmod(self.path, self.mode) + + +@attr.s(auto_attribs=True) +class SimpleExec(Utensil): + args: List[str] + working_dir: str + + @attr.s(auto_attribs=True) + class Result: + exit_code: int + stdout: bytes + stderr: bytes + + async def execute(self, channel: Channel, worktop: Worktop): + proc = await asyncio.create_subprocess_exec( + *self.args, + stdin=None, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=self.working_dir + ) + + stdout, stderr = await proc.communicate() + + # send the result + exit_code = proc.returncode + assert exit_code is not None + + await channel.send( + SimpleExec.Result(exit_code=exit_code, stdout=stdout, stderr=stderr) + ) + + +@attr.s(auto_attribs=True) +class HashFile(Utensil): + path: str + + async def execute(self, channel: Channel, worktop: Worktop): + sha256 = await asyncio.get_running_loop().run_in_executor( + worktop.pools.threaded, sha256_file, self.path + ) + await channel.send(sha256) diff --git a/scone/default/utensils/db_utensils.py b/scone/default/utensils/db_utensils.py new file mode 100644 index 0000000..802e675 --- /dev/null +++ b/scone/default/utensils/db_utensils.py @@ -0,0 +1,25 @@ +import attr + +from scone.common.chanpro import Channel +from scone.sous import Utensil +from scone.sous.utensils import Worktop + + +@attr.s(auto_attribs=True) +class PostgresTransaction(Utensil): + database: str + + async def execute(self, channel: Channel, worktop: Worktop) -> None: + import asyncpg + + async with asyncpg.connect(database=self.database) as conn: + async with conn.transaction(): + while True: + query, *args = await channel.recv() + if query is None: + break + results = [ + dict(record) for record in await conn.fetch(query, *args) + ] + + await channel.send(results) diff --git a/scone/default/utensils/dynamic_dependencies.py b/scone/default/utensils/dynamic_dependencies.py new file mode 100644 index 0000000..4d14730 --- /dev/null +++ b/scone/default/utensils/dynamic_dependencies.py @@ -0,0 +1,74 @@ +import asyncio +import sqlite3 +from pathlib import Path +from typing import Dict, List + +import attr + +from scone.common.chanpro import Channel +from scone.common.misc import sha256_file +from scone.sous import Utensil +from scone.sous.utensils import Worktop + + +@attr.s(auto_attribs=True) +class CanSkipDynamic(Utensil): + sous_file_hashes: Dict[str, str] + + async def execute(self, channel: Channel, worktop: Worktop): + for file, tracked_hash in self.sous_file_hashes.items(): + try: + real_hash = await asyncio.get_running_loop().run_in_executor( + worktop.pools.threaded, sha256_file, file + ) + if real_hash != tracked_hash: + await channel.send(False) + return + except IOError: + await channel.send(False) + # TODO should we log this? NB includes FileNotFound... + return + + await channel.send(True) + + +@attr.s(auto_attribs=True) +class HasChangedInSousStore(Utensil): + purpose: str + paths: List[str] + + def _sync_execute(self, worktop: Worktop) -> bool: + with sqlite3.connect(Path(worktop.dir, "sous_store.db")) as db: + db.execute( + """ + CREATE TABLE IF NOT EXISTS hash_store + (purpose TEXT, path TEXT, hash TEXT, PRIMARY KEY (purpose, path)) + """ + ) + changed = False + for file in self.paths: + real_hash = sha256_file(file) + c = db.execute( + "SELECT hash FROM hash_store WHERE purpose=? AND path=?", + (self.purpose, file), + ) + db_hash = c.fetchone() + if db_hash is None: + changed = True + db.execute( + "INSERT INTO hash_store VALUES (?, ?, ?)", + (self.purpose, file, real_hash), + ) + elif db_hash[0] != real_hash: + changed = True + db.execute( + "UPDATE hash_store SET hash=? WHERE purpose=? AND path=?", + (real_hash, self.purpose, file), + ) + return changed + + async def execute(self, channel: Channel, worktop: Worktop): + answer = await asyncio.get_running_loop().run_in_executor( + worktop.pools.threaded, self._sync_execute, worktop + ) + await channel.send(answer) diff --git a/scone/default/utensils/linux_utensils.py b/scone/default/utensils/linux_utensils.py new file mode 100644 index 0000000..1bf2e41 --- /dev/null +++ b/scone/default/utensils/linux_utensils.py @@ -0,0 +1,35 @@ +import pwd + +import attr + +from scone.common.chanpro import Channel +from scone.sous import Utensil +from scone.sous.utensils import Worktop + + +@attr.s(auto_attribs=True) +class GetPasswdEntry(Utensil): + user_name: str + + @attr.s(auto_attribs=True) + class Result: + uid: int + gid: int + home: str + shell: str + + async def execute(self, channel: Channel, worktop: Worktop): + try: + entry = pwd.getpwnam(self.user_name) + except KeyError: + await channel.send(None) + return + + await channel.send( + GetPasswdEntry.Result( + uid=entry.pw_uid, + gid=entry.pw_gid, + home=entry.pw_dir, + shell=entry.pw_shell, + ) + ) diff --git a/scone/head/__init__.py b/scone/head/__init__.py new file mode 100644 index 0000000..20bef22 --- /dev/null +++ b/scone/head/__init__.py @@ -0,0 +1,201 @@ +import copy +import itertools +import logging +import re +import sys +from os import path +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, cast + +import toml +from nacl.encoding import URLSafeBase64Encoder + +from scone.common.loader import ClassLoader +from scone.common.misc import eprint +from scone.common.pools import Pools +from scone.head import menu_reader +from scone.head.menu_reader import HostMenu, Menu +from scone.head.recipe import Recipe, recipe_name_getter +from scone.head.secrets import SecretAccess +from scone.head.variables import Variables, merge_right_into_left_inplace + +logger = logging.getLogger(__name__) + + +class Head: + def __init__( + self, + directory: str, + recipe_loader: ClassLoader[Recipe], + menu: Menu, + sous: Dict[str, dict], + groups: Dict[str, List[str]], + secret_access: Optional[SecretAccess], + pools: Pools, + ): + self.directory = directory + self.recipe_loader = recipe_loader + self.menu = menu + self.souss = sous + self.groups = groups + self.secret_access = secret_access + self.variables: Dict[str, Variables] = dict() + self.pools = pools + + @staticmethod + def open(directory: str): + with open(path.join(directory, "scone.head.toml")) as head_toml: + head_data = toml.load(head_toml) + + secret_access: Optional[SecretAccess] = None + if "freezer" in head_data and "restaurant_id" in head_data["freezer"]: + secret_access = SecretAccess(head_data["freezer"]["restaurant_id"]) + secret_access.get_existing() + if not secret_access.key: + eprint("Failed to load freezer secret.") + sys.exit(12) + + recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"]) + + # load available recipes + recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter) + for recipe_root in recipe_module_roots: + recipe_loader.add_package_root(recipe_root) + + sous = head_data.get("sous", dict()) + groups = head_data.get("group", dict()) + groups["all"] = list(sous.keys()) + + # load the menu + menu = menu_reader.parse_toml_menu_descriptors(path.join(directory, "menu")) + + pools = Pools() + + head = Head(directory, recipe_loader, menu, sous, groups, secret_access, pools) + head._load_variables() + return head + + def _preload_variables(self, who_for: str) -> Tuple[dict, dict]: + out_frozen: Dict[str, Any] = {} + out_chilled: Dict[str, Any] = {} + vardir = Path(self.directory, "vars", who_for) + + logger.debug("preloading vars for %s in %s", who_for, str(vardir)) + + for file in vardir.glob("*.vf.toml"): + if not file.is_file(): + continue + with file.open() as var_file: + logger.debug("Opened %s for frozen vars", file) + frozen_vars = cast(Dict[Any, Any], toml.load(var_file)) + + merge_right_into_left_inplace(out_frozen, frozen_vars) + + for file in vardir.glob("*.v.toml"): + if not file.is_file(): + continue + with file.open() as var_file: + logger.debug("Opened %s for vars", file) + chilled_vars = cast(Dict[Any, Any], toml.load(var_file)) + + merge_right_into_left_inplace(out_chilled, chilled_vars) + + to_transform = [out_frozen] + while to_transform: + next_dict = to_transform.pop() + for k, v in next_dict.items(): + if isinstance(v, str): + b64_secret = re.sub(r"\s", "", v) + if not self.secret_access: + raise RuntimeError("Secret access disabled; cannot thaw.") + next_dict[k] = self.secret_access.decrypt_bytes( + b64_secret.encode(), encoder=URLSafeBase64Encoder + ).decode() + elif isinstance(v, dict): + to_transform.append(v) + else: + raise ValueError(f"Not permitted in frozen variables file: '{v}'.") + + return out_chilled, out_frozen + + def _load_variables(self): + preload: Dict[str, Tuple[dict, dict]] = dict() + for who_name in itertools.chain(self.souss, self.groups): + preload[who_name] = self._preload_variables(who_name) + + for sous_name in self.souss: + order = ["all"] + order += [ + group + for group, members in self.groups.items() + if sous_name in members and group != "all" + ] + order.append(sous_name) + + chilled: Dict[str, Any] = {} + frozen: Dict[str, Any] = {} + + for who_name in order: + in_chilled, in_frozen = preload[who_name] + merge_right_into_left_inplace(chilled, in_chilled) + merge_right_into_left_inplace(frozen, in_frozen) + + sous_vars = Variables() + sous_vars.load_plain(frozen) + sous_vars.load_vars_with_substitutions(chilled) + + self.variables[sous_name] = sous_vars + + def _construct_hostmenu_for( + self, hostmenu: HostMenu, host: str, recipe_list: List[Recipe], head: "Head" + ) -> None: + for recipe_id, dishes in hostmenu.dishes.items(): + recipe_cls = self.recipe_loader.get_class(recipe_id) + if not recipe_cls: + raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.") + for slug, args in dishes.items(): + args = copy.deepcopy(args) + self.variables[host].substitute_inplace_in_dict(args) + recipe = recipe_cls.from_menu(host, slug, args, head) + recipe_list.append(recipe) + + def construct_recipes(self): + recipes = {} + for sous in self.souss: + logger.debug("Constructing recipes for %s", sous) + sous_recipe_list: List[Recipe] = [] + + # construct recipes for it only + sous_hm = self.menu.hostmenus.get(sous) + if sous_hm is not None: + self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self) + + # construct recipes for it that are for groups it is in + for group, members in self.groups.items(): + if sous in members: + group_hm = self.menu.hostmenus.get(group) + if group_hm is not None: + self._construct_hostmenu_for( + group_hm, sous, sous_recipe_list, self + ) + recipes[sous] = sous_recipe_list + logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous) + return recipes + + def debug_info(self) -> str: + lines = [] + lines.append("Head Configuration") + lines.append(" Sous List") + for name, sous in self.souss.items(): + lines.append(f" - {name} = {sous}") + lines.append("") + lines.append(" Sous Groups") + for name, group in self.groups.items(): + lines.append(f" - {name} = {group}") + lines.append("") + lines += [" " + line for line in str(self.recipe_loader).splitlines()] + lines.append("") + lines += [" " + line for line in str(self.menu).splitlines()] + lines.append("") + + return "\n".join(lines) diff --git a/scone/head/cli/__init__.py b/scone/head/cli/__init__.py new file mode 100644 index 0000000..57fec96 --- /dev/null +++ b/scone/head/cli/__init__.py @@ -0,0 +1,132 @@ +import asyncio +import logging +import os +import sys +import time +from argparse import ArgumentParser +from pathlib import Path + +from scone.common.misc import eprint +from scone.common.pools import Pools +from scone.head import Head +from scone.head.dependency_tracking import DependencyCache, run_dep_checks +from scone.head.kitchen import Kitchen +from scone.head.recipe import Preparation, Recipe + + +def cli() -> None: + logging.basicConfig() + logging.getLogger("scone").setLevel(logging.DEBUG) + code = asyncio.get_event_loop().run_until_complete(cli_async()) + sys.exit(code) + + +async def cli_async() -> int: + dep_cache = None + try: + args = sys.argv[1:] + + parser = ArgumentParser(description="Cook!") + parser.add_argument("hostspec", type=str, help="Sous or group name") + parser.add_argument( + "--yes", + "-y", + action="store_true", + default=False, + help="Don't prompt for confirmation", + ) + argp = parser.parse_args(args) + + eprint("Loading head…") + + cdir = Path(os.getcwd()) + + while not Path(cdir, "scone.head.toml").exists(): + cdir = cdir.parent + if len(cdir.parts) <= 1: + eprint("Don't appear to be in a head. STOP.") + return 1 + + head = Head.open(str(cdir)) + + eprint(head.debug_info()) + + hosts = set() + + if argp.hostspec in head.souss: + hosts.add(argp.hostspec) + elif argp.hostspec in head.groups: + for sous in head.groups[argp.hostspec]: + hosts.add(sous) + else: + eprint(f"Unrecognised sous or group: '{argp.hostspec}'") + sys.exit(1) + + eprint(f"Selected the following souss: {', '.join(hosts)}") + + recipes_by_sous = head.construct_recipes() + + recipes_to_do = [] + for sous in hosts: + recipes_to_do += recipes_by_sous.get(sous, []) + + eprint(f"Preparing {len(recipes_to_do)} recipes…") + prepare = Preparation(recipes_to_do) + + start_ts = time.monotonic() + order = prepare.prepare(head) + notifying_provides = prepare.notifying_provides + del prepare + end_ts = time.monotonic() + eprint(f"Preparation completed in {end_ts - start_ts:.3f} s.") + eprint(f"{len(order)} courses planned.") + + dep_cache = await DependencyCache.open( + os.path.join(head.directory, "depcache.sqlite3") + ) + + eprint("Checking dependency cache…") + start_ts = time.monotonic() + depchecks = await run_dep_checks(head, dep_cache, order) + end_ts = time.monotonic() + eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts + + for epoch, items in enumerate(order): + print(f"----- Course {epoch} -----") + + for item in items: + if isinstance(item, Recipe): + state = depchecks[item].label.name + print(f" > recipe ({state}) {item}") + elif isinstance(item, tuple): + kind, ident, extra = item + print(f" - we now have {kind} {ident} {dict(extra)}") + + eprint("Ready to cook? [y/N]: ", end="") + if argp.yes: + eprint("y (due to --yes)") + else: + if not input().lower().startswith("y"): + eprint("Stopping.") + return 101 + + kitchen = Kitchen(head, dep_cache, notifying_provides) + + for epoch, epoch_items in enumerate(order): + print(f"Cooking Course {epoch} of {len(order)}") + await kitchen.run_epoch( + epoch_items, depchecks, concurrency_limit_per_host=8 + ) + + for sous in hosts: + await dep_cache.sweep_old(sous) + + return 0 + finally: + Pools.get().shutdown() + if dep_cache: + await dep_cache.db.close() + + +if __name__ == "__main__": + cli() diff --git a/scone/head/cli/freezer.py b/scone/head/cli/freezer.py new file mode 100644 index 0000000..48cdab2 --- /dev/null +++ b/scone/head/cli/freezer.py @@ -0,0 +1,149 @@ +import os +import re +import sys +from os.path import join +from pathlib import Path + +import toml +from nacl.encoding import URLSafeBase64Encoder + +from scone.common.misc import eprint +from scone.head.secrets import SecretAccess + + +def cli() -> None: + args = sys.argv[1:] + + if len(args) < 1: + eprint("Not enough arguments.") + eprint("Usage: scone-freezer :") + eprint(" freezefile [small files only for now!]") + eprint(" thawfile ") + eprint(" freezevar (value as 1 line in stdin)") + eprint(" thawvar ") + eprint(" genkey") + eprint(" test") + sys.exit(127) + + cdir = Path(os.getcwd()) + + while not Path(cdir, "scone.head.toml").exists(): + cdir = cdir.parent + if len(cdir.parts) <= 1: + eprint("Don't appear to be in a head. STOP.") + sys.exit(1) + + with open(join(cdir, "scone.head.toml")) as head_toml: + head_data = toml.load(head_toml) + + if "freezer" in head_data and "restaurant_id" in head_data["freezer"]: + restaurant_id = head_data["freezer"]["restaurant_id"] + else: + eprint("Tip: Set a freezer.restaurant_id in your scone.head.toml") + eprint(" to enable the ability to store your secret in the secret service.") + restaurant_id = None + + secret_access = SecretAccess(restaurant_id) + + if args[0] == "genkey": + assert len(args) == 1 + secret_access.generate_new() + elif args[0] == "test": + secret_access.get_existing() + if secret_access.key: + eprint("Great! Key found.") + else: + eprint("Oh no! Key not found.") + elif args[0] == "freezefile": + secret_access.get_existing() + if not secret_access.key: + eprint("No key found!") + sys.exit(12) + + assert len(args) >= 2 + filepaths = [Path(p) for p in args[1:]] + ec = 0 + + for path in filepaths: + if not path.exists(): + eprint(f"Can't freeze: no such file '{path}'") + sys.exit(10) + + for path in filepaths: + eprint(f"Freezing {path}") + if not path.is_file(): + eprint(f"Can't freeze (skipping): not a regular file '{path}'") + ec = 5 + + # slurping here for simplicity; + file_bytes = path.read_bytes() + enc_bytes = secret_access.encrypt_bytes(file_bytes) + dest_path = Path(str(path) + ".frozen") + dest_path.write_bytes(enc_bytes) + + sys.exit(ec) + elif args[0] == "thawfile": + secret_access.get_existing() + if not secret_access.key: + eprint("No key found!") + sys.exit(12) + + assert len(args) >= 2 + filepaths = [Path(p) for p in args[1:]] + ec = 0 + + for path in filepaths: + if not path.exists(): + eprint(f"Can't thaw: no such file '{path}'") + sys.exit(10) + + for path in filepaths: + eprint(f"Thawing {path}") + if not path.is_file(): + eprint(f"Can't thaw (skipping): not a regular file '{path}'") + ec = 5 + continue + + pathstr = str(path) + if not pathstr.endswith(".frozen"): + eprint(f"Can't thaw (skipping): not .frozen '{path}'") + continue + + # slurping here for simplicity; + file_bytes = path.read_bytes() + dec_bytes = secret_access.decrypt_bytes(file_bytes) + dest_path = Path(str(pathstr[: -len(".frozen")])) + dest_path.write_bytes(dec_bytes) + + sys.exit(ec) + elif args[0] == "freezevar": + assert len(args) == 2 + secret_access.get_existing() + if not secret_access.key: + eprint("No key found!") + sys.exit(12) + key = args[1] + eprint("Enter value to freeze: ", end="", flush=True) + value = input() + enc_b64 = secret_access.encrypt_bytes( + value.encode(), encoder=URLSafeBase64Encoder + ).decode() + n = 78 + str_contents = "\n".join( + [" " + enc_b64[i : i + n] for i in range(0, len(enc_b64), n)] + ) + print(f'{key} = """') + print(str_contents) + print('"""') + elif args[0] == "thawvar": + assert len(args) == 1 + secret_access.get_existing() + if not secret_access.key: + eprint("No key found!") + sys.exit(12) + eprint("Enter base64 to thaw (whitespace removed painlessly) then EOF (^D):") + value = re.sub(r"\s", "", sys.stdin.read()) + dec_str = secret_access.decrypt_bytes( + value.encode(), encoder=URLSafeBase64Encoder + ).decode() + print(dec_str) diff --git a/scone/head/cli/michelin.py b/scone/head/cli/michelin.py new file mode 100644 index 0000000..63dfbc2 --- /dev/null +++ b/scone/head/cli/michelin.py @@ -0,0 +1,26 @@ +import asyncio +import sys +from argparse import ArgumentParser + + +def cli() -> None: + code = asyncio.get_event_loop().run_until_complete(cli_async()) + sys.exit(code) + + +async def cli_async() -> int: + args = sys.argv[1:] + + parser = ArgumentParser(description="Compose a menu!") + subs = parser.add_subparsers() + supermarket = subs.add_parser("supermarket", help="generate a [[supermarket]] dish") + supermarket.add_argument("url", help="HTTPS URL to download") + supermarket.add_argument("-a", "--as", help="Alternative filename") + supermarket.set_defaults(func=supermarket_cli) + + argp = parser.parse_args(args) + return await argp.func(argp) + + +async def supermarket_cli(argp) -> int: + return 0 # TODO diff --git a/scone/head/dependency_tracking.py b/scone/head/dependency_tracking.py new file mode 100644 index 0000000..d2fe5df --- /dev/null +++ b/scone/head/dependency_tracking.py @@ -0,0 +1,366 @@ +import asyncio +import json +import logging +import time +from asyncio import Queue +from enum import Enum +from hashlib import sha256 +from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union + +import aiosqlite +import attr +import canonicaljson +import cattr +from aiosqlite import Connection + +from scone.common.misc import sha256_file +from scone.common.pools import Pools +from scone.head import Head, Recipe, Variables, recipe_name_getter +from scone.head.recipe import DepEle + +canonicaljson.set_json_library(json) +logger = logging.getLogger(__name__) + +# TODO(security, low): how to prevent passwords being recovered from the +# paramhashes in a dependency store? + +# TODO(correctness, perf): recipes with @src@0 slugs should not be registered +# to a slug. + + +def _canonicalise_dict(input: Dict[str, Any]) -> Dict[str, Any]: + output: Dict[str, Any] = {} + for key, value in input.items(): + if isinstance(value, dict): + output[key] = _canonicalise_dict(value) + elif isinstance(value, set): + new_list = list(value) + new_list.sort() + output[key] = new_list + else: + output[key] = value + return output + + +def hash_dict(value: dict) -> str: + return sha256( + canonicaljson.encode_canonical_json(_canonicalise_dict(value)) + ).hexdigest() + + +def paramhash_recipe(recipe: Recipe) -> str: + args = getattr(recipe, "_args").copy() + del args[".source"] + return hash_dict(args) + + +@attr.s(auto_attribs=True) +class DependencyBook: + var_names: List[str] + var_hash: str + fridge_hashes: Dict[str, str] + recipe_revisions: Dict[str, int] + dyn_sous_file_hashes: Dict[str, str] + + async def can_skip_static(self, head: Head, recipe: Recipe) -> bool: + from scone.default.steps.fridge_steps import search_in_fridge + + # start with variables + sous_vars = head.variables[recipe.get_host()] + var_comp = dict() + for var_name in self.var_names: + try: + var_comp[var_name] = sous_vars.get_dotted(var_name) + except KeyError: + return False + + if hash_dict(var_comp) != self.var_hash: + return False + + # now we have to check files in the fridge + for fridge_name, expected_hash in self.fridge_hashes.items(): + real_pathstr = search_in_fridge(head, fridge_name) + if not real_pathstr: + # vanished locally; that counts as a change + return False + real_hash = await asyncio.get_running_loop().run_in_executor( + head.pools.threaded, sha256_file, real_pathstr + ) + if real_hash != expected_hash: + return False + + return True + + def has_dynamic(self) -> bool: + return len(self.dyn_sous_file_hashes) > 0 + + +class DependencyTracker: + """ + Tracks the dependencies of a task and then inserts a row as needed. + """ + + def __init__(self, pools: Pools): + self._vars: Dict[str, Any] = {} + self._fridge: Dict[str, str] = {} + self._recipe_revisions: Dict[str, int] = {} + self._dyn_sous_files: Dict[str, str] = {} + self._ignored = False + self._pools = pools + + def ignore(self): + """ + Call when dependency tracking is not desired (or not advanced enough to + be useful.) + """ + self._ignored = True + + async def register_fridge_file(self, fridge_path: str, real_path: str): + if fridge_path not in self._fridge: + f_hash = await asyncio.get_running_loop().run_in_executor( + self._pools.threaded, sha256_file, real_path + ) + self._fridge[fridge_path] = f_hash + + def register_recipe(self, recipe: Recipe): + cls = recipe.__class__ + rec_name = recipe_name_getter(cls) + if not rec_name: + return + self._recipe_revisions[rec_name] = getattr(cls, "_REVISION", None) + + def register_variable(self, variable: str, value: Union[dict, str, int]): + self._vars[variable] = value + + def register_remote_file(self, file: str, file_hash: str): + self._dyn_sous_files[file] = file_hash + + def make_depbook(self) -> Optional[DependencyBook]: + if self._ignored: + return None + dep_book = DependencyBook( + list(self._vars.keys()), + hash_dict(self._vars), + self._fridge.copy(), + self._recipe_revisions, + self._dyn_sous_files, + ) + return dep_book + + def get_j2_compatible_dep_var_proxies( + self, variables: Variables + ) -> Dict[str, "DependencyVarProxy"]: + result = {} + + for key, vars in variables.toplevel().items(): + result[key] = DependencyVarProxy(self, vars, key + ".") + + return result + + +class DependencyVarProxy: + """ + Provides convenient access to variables that also properly tracks + dependencies. + """ + + def __init__( + self, dependency_tracker: DependencyTracker, variables: dict, prefix: str = "" + ): + self._dvp_dt: DependencyTracker = dependency_tracker + self._dvp_prefix = prefix + self._dvp_vars = variables + + def __getattr__(self, key: str): + fully_qualified_varname = self._dvp_prefix + key + value = self._dvp_vars.get(key, ...) + if value is ...: + raise KeyError(f"Variable does not exist: {fully_qualified_varname}") + elif isinstance(value, dict): + return DependencyVarProxy(self._dvp_dt, value, key + ".") + else: + self._dvp_dt.register_variable(fully_qualified_varname, value) + return value + + +class DependencyCache: + def __init__(self): + self.db: Connection = None # type: ignore + self.time = int(time.time() * 1000) + + @classmethod + async def open(cls, path: str) -> "DependencyCache": + dc = DependencyCache() + dc.db = await aiosqlite.connect(path) + await dc.db.executescript( + """ + CREATE TABLE IF NOT EXISTS dishcache ( + source_file TEXT, + host TEXT, + recipe_id TEXT, + slug TEXT, + paramhash TEXT, + dep_book TEXT, + ts INT, + PRIMARY KEY (source_file, host, recipe_id, slug, paramhash) + ); + CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts); + """ + ) + await dc.db.commit() + return dc + + async def sweep_old(self, host: str): + # TODO(scope creep) allow sweeping only certain source files + # so we can do partial execution. + await self.db.execute( + """ + DELETE FROM dishcache + WHERE host = ? + AND ts < ? + """, + (host, self.time), + ) + await self.db.commit() + + async def inquire(self, recipe: Recipe) -> Optional[Tuple[int, DependencyBook]]: + paramhash = paramhash_recipe(recipe) + rows = await self.db.execute_fetchall( + """ + SELECT rowid, dep_book FROM dishcache + WHERE source_file = ? + AND host = ? + AND recipe_id = ? + AND paramhash = ? + AND slug = ? + LIMIT 1 + """, + ( + recipe._args[".source"][0], + recipe.get_host(), + recipe_name_getter(recipe.__class__), + paramhash, + recipe._slug, + ), + ) + rows = list(rows) + if not rows: + return None + + (rowid, dep_book_json) = rows[0] + + try: + dep_book = cattr.structure(json.loads(dep_book_json), DependencyBook) + except Exception: + logger.error( + "Failed to structure DependencyBook: %s", dep_book_json, exc_info=True + ) + raise + + return rowid, dep_book + + async def register(self, recipe: Recipe, dep_book: DependencyBook): + paramhash = paramhash_recipe(recipe) + await self.db.execute( + """ + INSERT INTO dishcache + (source_file, host, recipe_id, slug, paramhash, dep_book, ts) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (source_file, host, recipe_id, paramhash, slug) + DO UPDATE SET + dep_book = excluded.dep_book, + ts = excluded.ts + """, + ( + recipe._args[".source"][0], + recipe.get_host(), + recipe_name_getter(recipe.__class__), + recipe._slug, + paramhash, + canonicaljson.encode_canonical_json(cattr.unstructure(dep_book)), + self.time, + ), + ) + await self.db.commit() + + async def renew(self, rowid: int): + # TODO(perf): batch up many renews + await self.db.execute( + """ + UPDATE dishcache SET ts = ? WHERE rowid = ? LIMIT 1; + """, + (self.time, rowid), + ) + await self.db.commit() + + +class CheckOutcomeLabel(Enum): + # Not in dependency cache, so must run. + NOT_CACHED = 0 + + # Dependency cache suggests we must rerun + MUST_REDO = 1 + + # Dependency cache suggests we are fine if dynamic dependencies haven't + # changed + CHECK_DYNAMIC = 2 + + # Dependency cache says we can skip; there are no dynamic dependencies + SAFE_TO_SKIP = 3 + + +DepCheckOutcome = NamedTuple( + "DepCheckOutcome", + (("label", CheckOutcomeLabel), ("book", Optional[DependencyBook])), +) + + +async def run_dep_checks( + head: Head, dep_cache: DependencyCache, order: List[List[DepEle]] +) -> Dict[Recipe, DepCheckOutcome]: + queue: Queue[Optional[Recipe]] = Queue(32) + outcomes = {} + + async def consumer(): + while True: + recipe = await queue.get() + if not recipe: + break + t = await dep_cache.inquire(recipe) + if t: + # we need to check if dependencies have changed… + rowid, dep_book = t + if await dep_book.can_skip_static(head, recipe): + # we will renew either way + await dep_cache.renew(rowid) + if dep_book.has_dynamic(): + # has dynamic dependencies + outcomes[recipe] = DepCheckOutcome( + CheckOutcomeLabel.CHECK_DYNAMIC, dep_book + ) + else: + # can skip! + outcomes[recipe] = DepCheckOutcome( + CheckOutcomeLabel.SAFE_TO_SKIP, None + ) + else: + outcomes[recipe] = DepCheckOutcome( + CheckOutcomeLabel.MUST_REDO, None + ) + else: + outcomes[recipe] = DepCheckOutcome(CheckOutcomeLabel.NOT_CACHED, None) + queue.task_done() + + async def producer(): + for course in order: + for recipe in course: + if isinstance(recipe, Recipe): + await queue.put(recipe) + await queue.join() + for worker in consumers: + await queue.put(None) + + consumers = [asyncio.create_task(consumer()) for _ in range(8)] + await asyncio.gather(*consumers, producer(), return_exceptions=False) + + return outcomes diff --git a/scone/head/exceptions.py b/scone/head/exceptions.py new file mode 100644 index 0000000..1122151 --- /dev/null +++ b/scone/head/exceptions.py @@ -0,0 +1,6 @@ +class CookingError(Exception): + """ + Error in cooking. + """ + + pass diff --git a/scone/head/kitchen.py b/scone/head/kitchen.py new file mode 100644 index 0000000..b814527 --- /dev/null +++ b/scone/head/kitchen.py @@ -0,0 +1,192 @@ +import asyncio +import logging +from asyncio import Future +from collections import defaultdict +from contextvars import ContextVar +from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, TypeVar + +import attr +import cattr + +from scone.common.chanpro import Channel, ChanProHead +from scone.default.utensils.dynamic_dependencies import CanSkipDynamic +from scone.head import Head, Recipe, sshconn +from scone.head.dependency_tracking import ( + CheckOutcomeLabel, + DepCheckOutcome, + DependencyCache, + DependencyTracker, +) +from scone.head.recipe import DepEle, DependencySpec +from scone.sous import utensil_namer +from scone.sous.utensils import Utensil + + +logger = logging.getLogger(__name__) + +current_recipe: ContextVar[Recipe] = ContextVar("current_recipe") + +A = TypeVar("A") + + +class Kitchen: + def __init__( + self, + head: Head, + dependency_store: DependencyCache, + notifying_provides: Dict[Recipe, List[DependencySpec]], + ): + self._chanproheads: Dict[Tuple[str, str], Future[ChanProHead]] = dict() + self._dependency_store = dependency_store + self._dependency_trackers: Dict[Recipe, DependencyTracker] = defaultdict( + lambda: DependencyTracker(head.pools) + ) + self.head = head + self._notifying_provides = notifying_provides + self.notifications: Dict[DependencySpec, bool] = dict() + + def get_dependency_tracker(self): + return self._dependency_trackers[current_recipe.get()] + + async def get_chanprohead(self, host: str, user: str) -> ChanProHead: + async def new_conn(): + connection_details = self.head.souss[host] + # XXX opt ckey = + # os.path.join(self.head.directory, connection_details["clientkey"]) + + try: + cp, root = await sshconn.open_ssh_sous( + connection_details["host"], + connection_details["user"], + None, + user, + connection_details["souscmd"], + connection_details.get("dangerous_debug_logging", False) + ) + except Exception: + logger.error("Failed to open SSH connection", exc_info=True) + raise + + return ChanProHead(cp, root) + + hostuser = (host, user) + if hostuser not in self._chanproheads: + self._chanproheads[hostuser] = asyncio.create_task(new_conn()) + + return await self._chanproheads[hostuser] + + async def run_epoch( + self, + epoch: List[DepEle], + depchecks: Dict[Recipe, DepCheckOutcome], + concurrency_limit_per_host: int = 5, + ): + per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: []) + + # sort into per-host lists + for recipe in epoch: + if isinstance(recipe, Recipe): + if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP: + per_host_lists[recipe.get_host()].append(recipe) + + coros: List[Coroutine] = [] + + for host, recipes in per_host_lists.items(): + host_work_pool = HostWorkPool(recipes, depchecks) + coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host)) + + await asyncio.gather(*coros, return_exceptions=False) + + async def start(self, utensil: Utensil) -> Channel: + utensil_name = utensil_namer(utensil.__class__) + recipe = current_recipe.get() + cph = await self.get_chanprohead(recipe.get_host(), recipe.get_user(self.head)) + + # noinspection PyDataclass + payload = cattr.unstructure(utensil) + + return await cph.start_command_channel(utensil_name, payload) + + ut = start + + async def start_and_consume(self, utensil: Utensil) -> Any: + channel = await self.start(utensil) + return await channel.consume() + + ut1 = start_and_consume + + async def start_and_consume_attrs_optional( + self, utensil: Utensil, attr_class: Type[A] + ) -> Optional[A]: + value = await self.start_and_consume(utensil) + if value is None: + return None + return cattr.structure(value, attr_class) + + ut1a = start_and_consume_attrs_optional + + async def start_and_consume_attrs(self, utensil: Utensil, attr_class: Type[A]) -> A: + value = await self.start_and_consume_attrs_optional(utensil, attr_class) + if value is None: + raise ValueError("Received None") + return value + + ut1areq = start_and_consume_attrs + + async def start_and_wait_close(self, utensil: Utensil) -> Any: + channel = await self.start(utensil) + return await channel.wait_close() + + ut0 = start_and_wait_close + + async def _store_dependency(self, recipe: Recipe): + dependency_tracker = self._dependency_trackers.pop(recipe, None) + if not dependency_tracker: + raise KeyError(f"Recipe {recipe} has not been tracked.") + depbook = dependency_tracker.make_depbook() + if depbook: + await self._dependency_store.register(recipe, depbook) + + +@attr.s(auto_attribs=True) +class HostWorkPool: + jobs: List[Recipe] + depchecks: Dict[Recipe, DepCheckOutcome] + next_job: int = 0 + + async def cook_all(self, kitchen: Kitchen, concurrency_limit: int): + num_jobs = len(self.jobs) + concurrency_limit = min(num_jobs, concurrency_limit) + + async def cooker(): + while self.next_job < num_jobs: + recipe = self.jobs[self.next_job] + self.next_job += 1 + + current_recipe.set(recipe) + depcheck = self.depchecks.get(recipe) + if ( + depcheck is not None + and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC + ): + book = depcheck.book + assert book is not None + can_skip = await kitchen.ut1( + CanSkipDynamic(book.dyn_sous_file_hashes) + ) + if can_skip: + continue + + await recipe.cook(kitchen) + # if successful, store dependencies + await kitchen._store_dependency(recipe) + nps = kitchen._notifying_provides.get(recipe, None) + if nps: + for depspec in nps: + if depspec not in kitchen.notifications: + # default to changed if not told otherwise + kitchen.notifications[depspec] = True + + await asyncio.gather( + *[asyncio.create_task(cooker()) for _ in range(concurrency_limit)] + ) diff --git a/scone/head/menu_reader.py b/scone/head/menu_reader.py new file mode 100644 index 0000000..604fbd0 --- /dev/null +++ b/scone/head/menu_reader.py @@ -0,0 +1,125 @@ +import os +from os import path +from pathlib import Path +from typing import Any, Dict + +import toml + + +class Menu: + def __init__(self): + self.hostmenus = {} + + def get_host(self, name: str): + if name in self.hostmenus: + return self.hostmenus[name] + else: + new = HostMenu() + self.hostmenus[name] = new + return new + + def __str__(self): + lines = ["Menu"] + + for hostspec, hostmenu in self.hostmenus.items(): + lines.append(f" on {hostspec} :-") + lines += [" " + line for line in str(hostmenu).split("\n")] + lines.append("") + + return "\n".join(lines) + + +class HostMenu: + def __init__(self): + self.dishes = {} + + def __str__(self): + lines = ["Menu"] + + for recipe, dishes in self.dishes.items(): + lines.append(f"- recipe {recipe}") + lines += [f" - {slug} {args}" for slug, args in dishes.items()] + lines.append("") + + return "\n".join(lines) + + +def parse_toml_menu_descriptor( + filename: str, menu: Menu, default_hostspec: str, source_name: str = None +) -> None: + source_name = source_name or filename + + with open(filename, "r") as f: + menu_desc: Dict[str, Any] = toml.load(f) # type: ignore + + if "-----" in menu_desc: + magic_tweaks = menu_desc["-----"] + del menu_desc["-----"] + else: + magic_tweaks = {} + + for key, dishes in menu_desc.items(): + # print(key, "=", dishes) + key_parts = key.split("--") + lkp = len(key_parts) + if lkp == 1: + # pg-db.synapse + hostspec = default_hostspec + recipe = key_parts[0] + elif lkp == 2: + if key_parts[1] == "": + # fridge-copy-- + hostspec = default_hostspec + recipe = key_parts[0] + else: + # server1--pg-db.synapse + hostspec = key_parts[0] + recipe = key_parts[1] + elif lkp == 3 and key_parts[2] == "": + # server2--fridge-copy-- + hostspec = key_parts[0] + recipe = key_parts[1] + else: + raise ValueError(f"Don't understand key: {key}") + + hostmenu = menu.get_host(hostspec) + if recipe in hostmenu.dishes: + mdishes = hostmenu.dishes[recipe] + else: + mdishes = {} + hostmenu.dishes[recipe] = mdishes + + if isinstance(dishes, dict): + for slug, args in dishes.items(): + if slug in mdishes: + raise ValueError( + f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}" + ) + mdishes[slug] = args + args[".source"] = (source_name, key, slug) + args[".m"] = magic_tweaks + elif isinstance(dishes, list): + for idx, args in enumerate(dishes): + slug = f"@{source_name}@{idx}" + if slug in mdishes: + raise ValueError( + f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}" + ) + mdishes[slug] = args + args[".source"] = (source_name, key, idx) + args[".m"] = magic_tweaks + + +def parse_toml_menu_descriptors(menu_dir: str) -> Menu: + menu = Menu() + for root, dirs, files in os.walk(menu_dir): + for file in files: + full_path = path.join(root, file) + if file.endswith(".toml"): + # load this as a menu file + pieces = file.split(".") + default_hostspec = pieces[-2] + relative = str(Path(full_path).relative_to(menu_dir)) + parse_toml_menu_descriptor(full_path, menu, default_hostspec, relative) + + return menu diff --git a/scone/head/recipe.py b/scone/head/recipe.py new file mode 100644 index 0000000..66ba443 --- /dev/null +++ b/scone/head/recipe.py @@ -0,0 +1,171 @@ +import typing +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +import toposort + +if typing.TYPE_CHECKING: + from scone.head import Head + from scone.head.kitchen import Kitchen + +DependencySpec = Tuple[str, str, frozenset] +DepEle = Union["Recipe", DependencySpec] + + +class Preparation: + """ + Preparation on a single host. + This is done before we start deduplicating (could this be improved?). + """ + + def __init__(self, recipes: List["Recipe"]): + self._to_process = recipes.copy() + self.recipes = recipes + self._dependencies: Dict[DepEle, Set[DepEle]] = {} + self._recipe_now: Optional[Recipe] = None + self._hard_needs: Set[DependencySpec] = set() + self.notifying_provides: Dict[Recipe, List[DependencySpec]] = defaultdict( + lambda: [] + ) + + def _make_depspec_tuple( + self, requirement: str, identifier: str, **extra_identifiers: Any + ) -> DependencySpec: + if "host" not in extra_identifiers: + assert self._recipe_now is not None + extra_identifiers["host"] = self._recipe_now.get_host() + return requirement, identifier, frozenset(extra_identifiers.items()) + + def needs( + self, + requirement: str, + identifier: str, + hard: bool = False, + **extra_identifiers: Any, + ) -> None: + assert self._recipe_now is not None + if self._recipe_now not in self._dependencies: + self._dependencies[self._recipe_now] = set() + depspec_tuple = self._make_depspec_tuple( + requirement, identifier, **extra_identifiers + ) + self._dependencies[self._recipe_now].add(depspec_tuple) + if hard: + self._hard_needs.add(depspec_tuple) + + def provides( + self, + requirement: str, + identifier: str, + notifying: bool = False, + **extra_identifiers: Any, + ) -> None: + assert self._recipe_now is not None + depspec_tuple = self._make_depspec_tuple( + requirement, identifier, **extra_identifiers + ) + if depspec_tuple not in self._dependencies: + self._dependencies[depspec_tuple] = set() + self._dependencies[depspec_tuple].add(self._recipe_now) + if notifying: + self.notifying_provides[self._recipe_now].append(depspec_tuple) + + def subrecipe(self, recipe: "Recipe"): + assert self._recipe_now is not None + self._to_process.append(recipe) + self.recipes.append(recipe) + args = getattr(recipe, "_args") + if ".source" not in args: + file, key, slug = getattr(self._recipe_now, "_args")[".source"] + args[".source"] = (file, key + "-sub", slug) + + def prepare(self, head: "Head") -> List[List]: + while self._to_process: + next_recipe = self._to_process.pop() + self._recipe_now = next_recipe + next_recipe.prepare(self, head) + + for hard_need in self._hard_needs: + if hard_need not in self._dependencies: + raise ValueError(f"Hard need not satisfied (no entry): {hard_need}") + if not self._dependencies[hard_need]: + raise ValueError(f"Hard need not satisfied (empty): {hard_need}") + + self._dependencies[self._make_depspec_tuple(".internal", "completed")] = set( + self.recipes + ) + return list(toposort.toposort(self._dependencies)) + + +def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]: + if hasattr(c, "_NAME"): + return c._NAME # type: ignore + return None + + +class Recipe: + def __init__(self, host: str, slug: str, args: dict, head: "Head"): + self._host = host + self._slug = slug + self._args = args + + def get_host(self): + return self._host + + def get_tweak(self, name: str, default: Any) -> Any: + dotname = f".{name}" + if dotname in self._args: + return self._args[dotname] + elif ".m" in self._args and name in self._args[".m"]: + return self._args[".m"][name] + else: + return default + + def get_user(self, head: "Head") -> str: + user = self.get_tweak("user", head.souss[self._host]["user"]) + assert isinstance(user, str) + return user + + @classmethod + def from_menu(cls, host: str, slug: str, args: dict, head: "Head") -> "Recipe": + return cls(host, slug, args, head) + + def prepare(self, preparation: Preparation, head: "Head") -> None: + preparation.needs("os-user", self.get_user(head)) + + # TODO(feature) allow merging per-task and per-menu tweaks + # TODO(feature) allow need/provide custom things, not just user-units + + afters = self.get_tweak("needs", None) + if afters: + for after in afters: + if isinstance(after, list) and len(after) == 2: + # allow requesting custom needs + preparation.needs(after[0], after[1]) + continue + if not isinstance(after, str): + raise ValueError("needs tweak should be list of strings or pairs.") + preparation.needs("user-unit", after) + + befores = self.get_tweak("provides", None) + if befores: + if isinstance(befores, str): + preparation.provides("user-unit", befores) + else: + for before in befores: + if not isinstance(before, str): + raise ValueError("provides tweak should be list of strings.") + preparation.provides("user-unit", before) + + async def cook(self, kitchen: "Kitchen") -> None: + raise NotImplementedError + + def __str__(self): + cls = self.__class__ + if hasattr(cls, "RECIPE_NAME"): + return ( + f"{cls.RECIPE_NAME}({cls.__name__}) {self._slug} " # type: ignore + f"on {self._host} ({self._args})" + ) + else: + return f"{cls.__name__} {self._slug} on {self._host} ({self._args})" diff --git a/scone/head/secrets.py b/scone/head/secrets.py new file mode 100644 index 0000000..1a01793 --- /dev/null +++ b/scone/head/secrets.py @@ -0,0 +1,102 @@ +from typing import Optional + +import nacl +import nacl.utils +import secretstorage +from nacl.encoding import RawEncoder, URLSafeBase64Encoder +from nacl.secret import SecretBox + +from scone.common.misc import eprint + + +class SecretAccess: + def __init__(self, restaurant_identifier: Optional[str]): + self.restaurant_identifier = restaurant_identifier + self.key: Optional[bytes] = None + + def encrypt_bytes(self, data: bytes, encoder=RawEncoder) -> bytes: + box = SecretBox(self.key) + return box.encrypt(data, None, encoder) + + def decrypt_bytes(self, data: bytes, encoder=RawEncoder) -> bytes: + box = SecretBox(self.key) + return box.decrypt(data, None, encoder) + + def generate_new(self): + eprint("Generating a new freezer key...") + self.key = nacl.utils.random(SecretBox.KEY_SIZE) + key_b64 = URLSafeBase64Encoder.encode(self.key) + eprint("Your new key is: " + key_b64.decode()) + eprint("Pretty please store it in a safe place!") + + if not self.restaurant_identifier: + eprint("No RI; not saving to SS") + return + + eprint("Attempting to save it to the secret service...") + eprint("(save it yourself anyway!)") + + with secretstorage.dbus_init() as connection: + collection = secretstorage.get_default_collection(connection) + attributes = { + "application": "Scone", + "restaurant": self.restaurant_identifier, + } + items = list(collection.search_items(attributes)) + if items: + eprint( + "Found secret sauce for this Restaurant already!" + " Will not overwrite." + ) + else: + eprint("Storing secret sauce for this Restaurant...") + collection.create_item( + f"scone({self.restaurant_identifier}): secret sauce", + attributes, + key_b64, + ) + eprint("OK!") + + def get_existing(self): + if self.restaurant_identifier is not None: + self.key = self._try_dbus_auth(self.restaurant_identifier) + else: + self.key = self._try_manual_entry() + + def _try_dbus_auth(self, restaurant_identifier: str) -> Optional[bytes]: + eprint("Trying D-Bus Secret Service") + try: + with secretstorage.dbus_init() as connection: + collection = secretstorage.get_default_collection(connection) + attributes = { + "application": "Scone", + "restaurant": restaurant_identifier, + } + items = list(collection.search_items(attributes)) + if items: + eprint("Found secret sauce for this Restaurant, unlocking…") + items[0].unlock() + return URLSafeBase64Encoder.decode(items[0].get_secret()) + else: + eprint("Did not find secret sauce for this Restaurant.") + eprint("Enter it and I will try and store it...") + secret = self._try_manual_entry() + if secret is not None: + collection.create_item( + f"scone({restaurant_identifier}): secret sauce", + attributes, + URLSafeBase64Encoder.encode(secret), + ) + return secret + return None + except EOFError: # XXX what happens with no D-Bus + return None + + def _try_manual_entry(self) -> Optional[bytes]: + eprint("Manual entry required. Enter password for this restaurant: ", end="") + key = URLSafeBase64Encoder.decode(input().encode()) + if len(key) != SecretBox.KEY_SIZE: + eprint("Wrong size!") + return None + else: + return key diff --git a/scone/head/sshconn.py b/scone/head/sshconn.py new file mode 100644 index 0000000..2a73fff --- /dev/null +++ b/scone/head/sshconn.py @@ -0,0 +1,63 @@ +import logging +from typing import Optional, Tuple + +import asyncssh +from asyncssh import SSHClientConnection, SSHClientConnectionOptions, SSHClientProcess + +from scone.common.chanpro import Channel, ChanPro + +logger = logging.getLogger(__name__) + + +class AsyncSSHChanPro(ChanPro): + def __init__(self, connection: SSHClientConnection, process: SSHClientProcess): + super(AsyncSSHChanPro, self).__init__(process.stdout, process.stdin) + self._process = process + self._connection = connection + + async def close(self) -> None: + await super(AsyncSSHChanPro, self).close() + await self._process.close() + await self._connection.close() + + +async def open_ssh_sous( + host: str, + user: str, + client_key: Optional[str], + requested_user: str, + sous_command: str, + debug_logging: bool = False +) -> Tuple[ChanPro, Channel]: + if client_key: + opts = SSHClientConnectionOptions(username=user, client_keys=[client_key]) + else: + opts = SSHClientConnectionOptions(username=user) + + conn: SSHClientConnection = await asyncssh.connect(host, options=opts) + + if requested_user != user: + command = f"sudo -u {requested_user} {sous_command}" + else: + command = sous_command + + if debug_logging: + command = f"tee /tmp/sconnyin-{requested_user} | {command} 2>/tmp/sconnyerr-{requested_user} " \ + f"| tee /tmp/sconnyout-{requested_user}" + + process: SSHClientProcess = await conn.create_process(command, encoding=None) + + logger.debug("Constructing AsyncSSHChanPro...") + cp = AsyncSSHChanPro(conn, process) + logger.debug("Creating root channel...") + ch = cp.new_channel(number=0, desc="Root channel") + cp.start_listening_to_channels(default_route=None) + logger.debug("Sending head hello...") + await ch.send({"hello": "head"}) + logger.debug("Waiting for sous hello...") + sous_hello = await ch.recv() + logger.debug("Got sous hello... checking") + assert isinstance(sous_hello, dict) + assert sous_hello["hello"] == "sous" + logger.debug("Valid sous hello...") + return cp, ch diff --git a/scone/head/utils.py b/scone/head/utils.py new file mode 100644 index 0000000..a7499da --- /dev/null +++ b/scone/head/utils.py @@ -0,0 +1,27 @@ +from typing import Any, Optional, Type, TypeVar, cast + +from typeguard import check_type as check_typeguard + +_A = TypeVar("_A") + + +def check_type(value: Any, check_type: Type[_A], name: str = "value") -> _A: + check_typeguard(name, value, check_type) + # if not isinstance(value, check_type): + # raise TypeError(f"Not of type {check_type}") + return cast(_A, value) + + +def check_type_opt( + value: Any, check_type: Type[_A], name: str = "value" +) -> Optional[_A]: + check_typeguard(name, value, Optional[check_type]) + # if not isinstance(value, check_type): + # raise TypeError(f"Not of type {check_type}") + return cast(_A, value) + + +def check_type_adv(value: Any, check_type: Any, name: str = "value") -> Any: + # permitted to use special forms with this one + check_typeguard(name, value, check_type) + return value diff --git a/scone/head/variables.py b/scone/head/variables.py new file mode 100644 index 0000000..f291303 --- /dev/null +++ b/scone/head/variables.py @@ -0,0 +1,169 @@ +from enum import Enum +from typing import Any, Dict, List, NamedTuple + +ExpressionPart = NamedTuple("ExpressionPart", [("kind", str), ("value", str)]) + + +def flatten_dict(nested: Dict[str, Any]) -> Dict[str, Any]: + for key in nested: + if not isinstance(key, str): + # not possible to flatten + return nested + + flat = {} + + for key, value in nested.items(): + if isinstance(value, dict) and value: + sub_flat = flatten_dict(value) + for k in sub_flat: + if not isinstance(k, str): + flat[key] = value + break + else: + # can flatten + for k, v in sub_flat.items(): + flat[f"{key}.{k}"] = v + else: + flat[key] = value + + return flat + + +class ExprParsingState(Enum): + NORMAL = 1 + DOLLAR = 2 + VARIABLE_NAME = 3 + + +def parse_expr(expr: str) -> List[ExpressionPart]: + state = ExprParsingState.NORMAL + buffer = "" + out = [] + for char in expr: + if state == ExprParsingState.NORMAL: + if char == "$": + state = ExprParsingState.DOLLAR + else: + buffer += char + elif state == ExprParsingState.DOLLAR: + if char == "$": + # escaped dollar sign + buffer += "$" + state = ExprParsingState.NORMAL + elif char == "{": + state = ExprParsingState.VARIABLE_NAME + if buffer: + out.append(ExpressionPart("literal", buffer)) + buffer = "" + else: + buffer += "$" + char + state = ExprParsingState.NORMAL + elif state == ExprParsingState.VARIABLE_NAME: + if char == "}": + state = ExprParsingState.NORMAL + out.append(ExpressionPart("variable", buffer)) + buffer = "" + else: + buffer += char + + if state != ExprParsingState.NORMAL: + raise ValueError(f"Wrong end state: {state}") + + if buffer: + out.append(ExpressionPart("literal", buffer)) + + return out + + +def merge_right_into_left_inplace(left: dict, right: dict): + for key, value in right.items(): + if isinstance(value, dict) and key in left and isinstance(left[key], dict): + merge_right_into_left_inplace(left[key], value) + else: + left[key] = value + + +class Variables: + def __init__(self): + self._vars: Dict[str, Any] = {} + + def get_dotted(self, name: str) -> Any: + current = self._vars + keys = name.split(".") + try: + for k in keys: + current = current[k] + return current + except KeyError: + raise KeyError("No variable: " + name) + + def has_dotted(self, name: str) -> bool: + try: + self.get_dotted(name) + return True + except KeyError: + return False + + def set_dotted(self, name: str, value: Any): + current = self._vars + keys = name.split(".") + for k in keys[:-1]: + if k not in current: + current[k] = {} + current = current[k] + current[keys[-1]] = value + + def _eval_with_incoming(self, expr: str, incoming: Dict[str, str]) -> Any: + parsed = parse_expr(expr) + + if len(parsed) == 1 and parsed[0].kind == "variable": + var_name = parsed[0].value + if self.has_dotted(var_name): + return self.get_dotted(var_name) + elif var_name in incoming: + sub_expr = incoming.pop(var_name) + sub_val = self._eval_with_incoming(sub_expr, incoming) + self.set_dotted(var_name, sub_val) + return sub_val + else: + raise KeyError(f"No variable '{incoming}'") + + out = "" + for part in parsed: + if part.kind == "literal": + out += part.value + elif part.kind == "variable": + var_name = parsed[0].value + if self.has_dotted(var_name): + out += str(self.get_dotted(var_name)) + elif var_name in incoming: + sub_expr = incoming.pop(var_name) + sub_val = self._eval_with_incoming(sub_expr, incoming) + self.set_dotted(var_name, sub_val) + out += str(sub_val) + else: + raise KeyError(f"No variable '{incoming}'") + return out + + def load_vars_with_substitutions(self, incoming: Dict[str, Any]): + incoming = flatten_dict(incoming) + while incoming: + key, expr = incoming.popitem() + value = self._eval_with_incoming(expr, incoming) + self.set_dotted(key, value) + + def eval(self, expr: str) -> Any: + return self._eval_with_incoming(expr, {}) + + def load_plain(self, incoming: Dict[str, Any]): + merge_right_into_left_inplace(self._vars, incoming) + + def substitute_inplace_in_dict(self, dictionary: Dict[str, Any]): + for k, v in dictionary.items(): + if isinstance(v, dict): + self.substitute_inplace_in_dict(v) + elif isinstance(v, str): + dictionary[k] = self.eval(v) + + def toplevel(self): + return self._vars diff --git a/scone/sous/__init__.py b/scone/sous/__init__.py new file mode 100644 index 0000000..7292cc5 --- /dev/null +++ b/scone/sous/__init__.py @@ -0,0 +1,27 @@ +from os import path + +import toml + +from scone.common.loader import ClassLoader +from scone.sous.utensils import Utensil, utensil_namer + + +class Sous: + def __init__(self, ut_loader: ClassLoader[Utensil]): + self.utensil_loader = ut_loader + + @staticmethod + def open(directory: str): + with open(path.join(directory, "scone.sous.toml")) as sous_toml: + sous_data = toml.load(sous_toml) + + utensil_module_roots = sous_data.get( + "utensil_roots", ["scone.default.utensils"] + ) + + # load available recipes + loader: ClassLoader[Utensil] = ClassLoader(Utensil, utensil_namer) + for package_root in utensil_module_roots: + loader.add_package_root(package_root) + + return Sous(loader) diff --git a/scone/sous/__main__.py b/scone/sous/__main__.py new file mode 100644 index 0000000..b242c1f --- /dev/null +++ b/scone/sous/__main__.py @@ -0,0 +1,101 @@ +import asyncio +import logging +import os +import pwd +import sys +from pathlib import Path +from typing import List, cast + +import cattr + +from scone.common.chanpro import Channel, ChanPro +from scone.common.pools import Pools +from scone.sous import Sous, Utensil +from scone.sous.utensils import Worktop + +logger = logging.getLogger(__name__) + + +async def main(args: List[str]): + # loop = asyncio.get_event_loop() + # reader = asyncio.StreamReader() + # reader_protocol = asyncio.StreamReaderProtocol(reader) + # await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer) + # + # writer_transport, writer_protocol = await loop.connect_write_pipe( + # FlowControlMixin, sys.stdout.buffer) + # writer = StreamWriter(writer_transport, writer_protocol, None, loop) + # data = await reader.readexactly(5) + # + # writer.write(data) + # await writer.drain() + + logging.basicConfig(level=logging.DEBUG) + + if len(args) < 1: + raise RuntimeError("Needs to be passed a sous config directory as 1st arg!") + + sous = Sous.open(args[0]) + logger.debug("Sous created") + + cp = await ChanPro.open_from_stdio() + root = cp.new_channel(0, "Root channel") + cp.start_listening_to_channels(default_route=root) + + await root.send({"hello": "sous"}) + + remote_hello = await root.recv() + assert isinstance(remote_hello, dict) + assert remote_hello["hello"] == "head" + + sous_user = pwd.getpwuid(os.getuid()).pw_name + + quasi_pers = Path(args[0], "worktop", sous_user) + + if not quasi_pers.exists(): + quasi_pers.mkdir(parents=True) + + worktop = Worktop(quasi_pers, Pools()) + + logger.info("Worktop dir is: %s", worktop.dir) + + while True: + try: + message = await root.recv() + except EOFError: + break + if "nc" in message: + # start a new command channel + channel_num = message["nc"] + command = message["cmd"] + payload = message["pay"] + + utensil_class = sous.utensil_loader.get_class(command) + utensil = cast(Utensil, cattr.structure(payload, utensil_class)) + + channel = cp.new_channel(channel_num, command) + + logger.debug("going to sched task with %r", utensil) + + asyncio.create_task(run_utensil(utensil, channel, worktop)) + elif "lost" in message: + # for a then-non-existent channel, but probably just waiting on us + # retry without a default route. + await cp.handle_incoming_message(message["lost"]) + else: + raise RuntimeError(f"Unknown ch0 message {message}") + + +async def run_utensil(utensil: Utensil, channel: Channel, worktop: Worktop): + try: + await utensil.execute(channel, worktop) + except Exception: + logger.error("Unhandled Exception in Utensil", exc_info=True) + await channel.close("Exception in utensil") + else: + logger.debug("Utensil finished with normal reason") + await channel.close("Utensil complete") + + +if __name__ == "__main__": + asyncio.get_event_loop().run_until_complete(main(sys.argv[1:])) diff --git a/scone/sous/utensils.py b/scone/sous/utensils.py new file mode 100644 index 0000000..273ac13 --- /dev/null +++ b/scone/sous/utensils.py @@ -0,0 +1,26 @@ +from pathlib import Path +from typing import Type, TypeVar + +from scone.common.chanpro import Channel +from scone.common.pools import Pools + +T = TypeVar("T") + + +class Worktop: + def __init__(self, dir: Path, pools: Pools): + # mostly-persistent worktop space for utensils + self.dir = dir + self.pools = pools + + +class Utensil: + def __init__(self): + pass + + async def execute(self, channel: Channel, worktop: Worktop): + raise NotImplementedError + + +def utensil_namer(c: Type) -> str: + return f"{c.__module__}.{c.__name__}" diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh new file mode 100755 index 0000000..e006274 --- /dev/null +++ b/scripts-dev/lint.sh @@ -0,0 +1,18 @@ +#!/bin/sh -eu + +if [ $# -ge 1 ] +then + files=$* +else + files="scone tests" +fi + +echo "Linting these locations: $files" +echo " ===== Running isort ===== " +isort $files +echo " ===== Running black ===== " +black $files +echo " ===== Running flake8 ===== " +flake8 $files +echo " ===== Running mypy ===== " +mypy $files diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..549f8ad --- /dev/null +++ b/setup.cfg @@ -0,0 +1,21 @@ +[flake8] +# line length defaulted to by black +max-line-length = 88 + +# see https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes +# for error codes. The ones we ignore are: +# W503: line break before binary operator +# W504: line break after binary operator +# E203: whitespace before ':' (which is contrary to pep8?) +# (this is a subset of those ignored in Synapse) +ignore=W503,W504,E203 + +[isort] +line_length = 88 +sections=FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,TESTS,LOCALFOLDER +default_section=THIRDPARTY +known_first_party=scone +known_tests=tests +multi_line_output=3 +include_trailing_comma=true +combine_as_imports=true diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..0c0423b --- /dev/null +++ b/setup.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Note: To use the 'upload' functionality of this file, you must: +# $ pipenv install twine --dev + +import io +import os +import sys +from shutil import rmtree + +from setuptools import find_packages, setup, Command + +# Package meta-data. +NAME = 'scone' +DESCRIPTION = 'Simple CONfiguration Engine' +URL = 'https://librepush.net/project/scone' +EMAIL = 'rei@librepush.net' +AUTHOR = 'Olivier \'reivilibre\'' +REQUIRES_PYTHON = '>=3.7.0' +VERSION = '0.1.0' + +# What packages are required for this module to be executed? +REQUIRED = [ + "cbor2~=5.1.1", + "setuptools~=49.1.2", + "toml~=0.10.1", + "attrs~=19.3.0", + "cattrs~=1.0.0", + "canonicaljson~=1.2.0" +] + + +EX_SOUS_BASE = [] +EX_SOUS_PG = ["asyncpg"] + +EX_SOUS_ALL = EX_SOUS_BASE + EX_SOUS_PG + + + +# What packages are optional? +EXTRAS = { + "head": [ + "SecretStorage~=3.1.2", + "asyncssh[libnacl]~=2.2.1", + "toposort~=1.5", + "pynacl", + "aiosqlite~=0.15.0", + "requests", + "Jinja2", + "typeguard" + ], + "sous": EX_SOUS_ALL, + "sous-core": EX_SOUS_BASE, + "sous-pg": EX_SOUS_PG +} + +# The rest you shouldn't have to touch too much :) +# ------------------------------------------------ +# Except, perhaps the License and Trove Classifiers! +# If you do change the License, remember to change the Trove Classifier for that! + +here = os.path.abspath(os.path.dirname(__file__)) + +# Import the README and use it as the long-description. +# Note: this will only work if 'README.md' is present in your MANIFEST.in file! +try: + with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f: + long_description = '\n' + f.read() +except FileNotFoundError: + long_description = DESCRIPTION + +# Load the package's __version__.py module as a dictionary. +about = {} +if not VERSION: + project_slug = NAME.lower().replace("-", "_").replace(" ", "_") + with open(os.path.join(here, project_slug, '__version__.py')) as f: + exec(f.read(), about) +else: + about['__version__'] = VERSION + + +class UploadCommand(Command): + """Support setup.py upload.""" + + description = 'Build and publish the package.' + user_options = [] + + @staticmethod + def status(s): + """Prints things in bold.""" + print('\033[1m{0}\033[0m'.format(s)) + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + try: + self.status('Removing previous builds…') + rmtree(os.path.join(here, 'dist')) + except OSError: + pass + + self.status('Building Source and Wheel (universal) distribution…') + os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable)) + + self.status('Uploading the package to PyPI via Twine…') + os.system('twine upload dist/*') + + self.status('Pushing git tags…') + os.system('git tag v{0}'.format(about['__version__'])) + os.system('git push --tags') + + sys.exit() + + +# Where the magic happens: +setup( + name=NAME, + version=about['__version__'], + description=DESCRIPTION, + long_description=long_description, + long_description_content_type='text/markdown', + author=AUTHOR, + author_email=EMAIL, + python_requires=REQUIRES_PYTHON, + url=URL, + packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]), + # If your package is a single module, use this instead of 'packages': + # py_modules=['mypackage'], + + entry_points={ + 'console_scripts': [ + 'scone=scone.head.cli:cli', + 'scone-freezer=scone.head.cli.freezer:cli' + ], + }, + install_requires=REQUIRED, + extras_require=EXTRAS, + include_package_data=True, + # TODO license='GPL3', + classifiers=[ + # Trove classifiers + # Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: Python :: Implementation :: PyPy' + ], + # $ setup.py publish support. + cmdclass={ + 'upload': UploadCommand, + }, +) \ No newline at end of file diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..51e8d10 --- /dev/null +++ b/tox.ini @@ -0,0 +1,34 @@ +[tox] +envlist = py37, check_codestyle, check_types + +[testenv] + +# As of twisted 16.4, trial tries to import the tests as a package (previously +# it loaded the files explicitly), which means they need to be on the +# pythonpath. Our sdist doesn't include the 'tests' package, so normally it +# doesn't work within the tox virtualenv. +# +# As a workaround, we tell tox to do install with 'pip -e', which just +# creates a symlink to the project directory instead of unpacking the sdist. +usedevelop=true + +commands = trial tests + +[testenv:check_codestyle] +skip_install = True +deps = + # pin deps so we don't start failing when they are updated + flake8==3.8.3 + black==19.10b0 + isort~=5.0 +commands = + flake8 . + black --check --diff . + isort --check-only --diff . + +[testenv:check_types] +deps = + mypy==0.780 + mypy-zope==0.2.7 +commands = + mypy .