From 58af07bbb2ff52f5095406e97b3b86a1cedac626 Mon Sep 17 00:00:00 2001 From: Olivier Date: Wed, 7 Oct 2020 21:11:48 +0100 Subject: [PATCH] New DAG execution model, works! But no dedupe yet --- .gitignore | 5 + mypy.ini | 6 + scone/__main__.py | 4 +- scone/default/recipes/apt.py | 50 ++- scone/default/recipes/filesystem.py | 65 ++-- scone/default/recipes/fridge.py | 39 +- scone/default/recipes/linux.py | 44 ++- scone/default/recipes/postgres.py | 18 +- scone/default/recipes/python.py | 33 +- scone/default/recipes/systemd.py | 49 +-- scone/default/steps/basic_steps.py | 6 +- scone/default/steps/fridge_steps.py | 3 +- scone/head/__init__.py | 201 ---------- scone/head/cli/__init__.py | 78 ++-- scone/head/dag.py | 193 ++++++++++ scone/head/dependency_tracking.py | 315 +++++----------- scone/head/dot_emitter.py | 57 +++ scone/head/grammar/scoml.tx | 163 +++++++++ scone/head/head.py | 209 +++++++++++ scone/head/kitchen.py | 322 ++++++++++++---- scone/head/menu_reader.py | 546 +++++++++++++++++++++++----- scone/head/recipe.py | 175 ++------- scone/head/sshconn.py | 9 +- scone/head/variables.py | 9 +- setup.py | 3 +- 25 files changed, 1687 insertions(+), 915 deletions(-) create mode 100644 .gitignore create mode 100644 scone/head/dag.py create mode 100644 scone/head/dot_emitter.py create mode 100644 scone/head/grammar/scoml.tx create mode 100644 scone/head/head.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0eae31a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/.idea +__pycache__ +/scone.egg-info +/dist + diff --git a/mypy.ini b/mypy.ini index 9f4616d..1bb94ba 100644 --- a/mypy.ini +++ b/mypy.ini @@ -21,3 +21,9 @@ ignore_missing_imports = True [mypy-asyncpg] ignore_missing_imports = True + +[mypy-frozendict] +ignore_missing_imports = True + +[mypy-textx] +ignore_missing_imports = True diff --git a/scone/__main__.py b/scone/__main__.py index c12ff05..bc4bca2 100644 --- a/scone/__main__.py +++ b/scone/__main__.py @@ -3,7 +3,9 @@ # import sys # from typing import List # -# from scone.head import Head, Recipe +# from scone.head.head import Head +# from scone.head.recipe import Recipe + # from scone.head.kitchen import Kitchen # from scone.head.recipe import Preparation diff --git a/scone/default/recipes/apt.py b/scone/default/recipes/apt.py index 8b50f22..603cc3c 100644 --- a/scone/default/recipes/apt.py +++ b/scone/default/recipes/apt.py @@ -1,9 +1,9 @@ from typing import Dict, List, Set, Tuple from scone.default.utensils.basic_utensils import SimpleExec -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type @@ -17,8 +17,8 @@ class AptInstallInternal(Recipe): # TODO(extension, low): expand this into apt-install-now if we need # the flexibility - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.packages: Set[str] = set() @@ -66,23 +66,37 @@ class AptPackage(Recipe): internal_installers: Dict[Tuple[Head, str], AptInstallInternal] = {} - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.packages: List[str] = check_type(args["packages"], list) def prepare(self, preparation: Preparation, head: Head) -> None: super().prepare(preparation, head) - pair = (head, self.get_host()) - if pair not in AptPackage.internal_installers: - install_internal = AptInstallInternal(self.get_host(), "internal", {}, head) - AptPackage.internal_installers[pair] = install_internal - preparation.subrecipe(install_internal) - preparation.provides("apt-stage", "internal-install-packages") - internal_installer = AptPackage.internal_installers.get(pair) - assert internal_installer is not None - internal_installer.packages.update(self.packages) + for package in self.packages: + preparation.provides("apt-package", package) async def cook(self, kitchen: Kitchen) -> None: - # can't be tracked - kitchen.get_dependency_tracker().ignore() + # this is a one-off task assuming everything works + kitchen.get_dependency_tracker() + + if self.packages: + update = await kitchen.ut1areq( + SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result + ) + if update.exit_code != 0: + raise RuntimeError( + f"apt update failed with err {update.exit_code}: {update.stderr!r}" + ) + + install_args = ["apt-get", "-yq", "install"] + install_args += list(self.packages) + install = await kitchen.ut1areq( + SimpleExec(install_args, "/"), SimpleExec.Result + ) + + if install.exit_code != 0: + raise RuntimeError( + f"apt install failed with err {install.exit_code}:" + f" {install.stderr!r}" + ) diff --git a/scone/default/recipes/filesystem.py b/scone/default/recipes/filesystem.py index 0750118..f874a7d 100644 --- a/scone/default/recipes/filesystem.py +++ b/scone/default/recipes/filesystem.py @@ -12,9 +12,9 @@ from scone.default.utensils.basic_utensils import ( Stat, ) from scone.default.utensils.dynamic_dependencies import HasChangedInSousStore -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type, check_type_opt @@ -28,7 +28,7 @@ class DeclareFile(Recipe): _NAME = "declare-file" def prepare(self, preparation: Preparation, head: Head): - preparation.provides("file", self._args["path"]) + preparation.provides("file", self.arguments["path"]) async def cook(self, kitchen: Kitchen): # mark as tracked. @@ -45,7 +45,7 @@ class DeclareDirectory(Recipe): _NAME = "declare-dir" def prepare(self, preparation: Preparation, head: Head): - preparation.provides("directory", self._args["path"]) + preparation.provides("directory", self.arguments["path"]) async def cook(self, kitchen: Kitchen): # mark as tracked. @@ -59,8 +59,8 @@ class EnsureDirectory(Recipe): _NAME = "directory" - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) parents = args.get("parents", 0) assert isinstance(parents, int) @@ -74,7 +74,7 @@ class EnsureDirectory(Recipe): self.parents = parents self.mode = parse_mode(mode, directory=True) self._make: List[str] = [] - self.targ_user = args.get("owner", self.get_user(head)) + self.targ_user = args.get("owner", recipe_context.user) self.targ_group = args.get("group", self.targ_user) def prepare(self, preparation: Preparation, head: "Head"): @@ -123,8 +123,8 @@ class ExtractTar(Recipe): _NAME = "tar-extract" - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.tar = check_type(args.get("tar"), str) self.dir = check_type(args.get("dir"), str) @@ -165,8 +165,8 @@ class RunScript(Recipe): _NAME = "script-run" - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.working_dir = check_type(args.get("working_dir"), str) @@ -196,8 +196,8 @@ class CommandOnChange(Recipe): _NAME = "command-on-change" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.purpose = check_type(args.get("purpose"), str) self.command = check_type(args.get("command"), list) @@ -232,8 +232,8 @@ class GitCheckout(Recipe): # declare SAFE_TO_SKIP. Perhaps we want to stop that unless you opt out? # But oh well for now. - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.repo_src = check_type(args.get("src"), str) self.dest_dir = check_type(args.get("dest"), str) @@ -270,9 +270,7 @@ class GitCheckout(Recipe): stat = await k.ut1a(Stat(self.dest_dir), Stat.Result) if stat is None: # doesn't exist; git init it - await exec_no_fails( - k, ["git", "init", self.dest_dir], "/" - ) + await exec_no_fails(k, ["git", "init", self.dest_dir], "/") stat = await k.ut1a(Stat(self.dest_dir), Stat.Result) if stat is None: @@ -283,29 +281,30 @@ class GitCheckout(Recipe): # add the remote, removing it first to ensure it's what we want # don't care if removing fails - await k.ut1areq(SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir), SimpleExec.Result) + await k.ut1areq( + SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir), + SimpleExec.Result, + ) await exec_no_fails( k, ["git", "remote", "add", "scone", self.repo_src], self.dest_dir ) # fetch the latest from the remote - await exec_no_fails( - k, ["git", "fetch", "scone"], self.dest_dir - ) + await exec_no_fails(k, ["git", "fetch", "scone"], self.dest_dir) # figure out what ref we want to use # TODO(performance): fetch only this ref? ref = self.ref or f"scone/{self.branch}" # switch to that ref - await exec_no_fails( - k, ["git", "switch", "--detach", ref], self.dest_dir - ) + await exec_no_fails(k, ["git", "switch", "--detach", ref], self.dest_dir) # if we use submodules if self.submodules: await exec_no_fails( - k, ["git", "submodule", "update", "--init", "--recursive"], self.dest_dir + k, + ["git", "submodule", "update", "--init", "--recursive"], + self.dest_dir, ) for expected in self.expect: @@ -313,10 +312,16 @@ class GitCheckout(Recipe): # TODO(performance, low): parallelise these stat = await k.ut1a(Stat(expected_path_str), Stat.Result) if not stat: - raise RuntimeError(f"expected {expected_path_str} to exist but it did not") + raise RuntimeError( + f"expected {expected_path_str} to exist but it did not" + ) if stat.dir and not expected.endswith("/"): - raise RuntimeError(f"expected {expected_path_str} to exist as a file but it is a dir") + raise RuntimeError( + f"expected {expected_path_str} to exist as a file but it is a dir" + ) if not stat.dir and expected.endswith("/"): - raise RuntimeError(f"expected {expected_path_str} to exist as a dir but it is a file") + raise RuntimeError( + f"expected {expected_path_str} to exist as a dir but it is a file" + ) diff --git a/scone/default/recipes/fridge.py b/scone/default/recipes/fridge.py index c43ab04..72ac0d2 100644 --- a/scone/default/recipes/fridge.py +++ b/scone/default/recipes/fridge.py @@ -8,11 +8,15 @@ from urllib.request import urlretrieve from scone.common.misc import sha256_file from scone.common.modeutils import DEFAULT_MODE_FILE, parse_mode from scone.default.steps import fridge_steps -from scone.default.steps.fridge_steps import FridgeMetadata, load_and_transform, SUPERMARKET_RELATIVE -from scone.default.utensils.basic_utensils import WriteFile, Chown -from scone.head import Head -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation, Recipe +from scone.default.steps.fridge_steps import ( + SUPERMARKET_RELATIVE, + FridgeMetadata, + load_and_transform, +) +from scone.default.utensils.basic_utensils import Chown, WriteFile +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type @@ -23,8 +27,9 @@ class FridgeCopy(Recipe): _NAME = "fridge-copy" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head: Head): + super().__init__(recipe_context, args, head) + fp = fridge_steps.search_in_fridge(head, args["src"]) if fp is None: raise ValueError(f"Cannot find {args['src']} in the fridge.") @@ -44,7 +49,7 @@ class FridgeCopy(Recipe): mode = args.get("mode", DEFAULT_MODE_FILE) assert isinstance(mode, str) or isinstance(mode, int) - self.fridge_path: str = args["src"] + self.fridge_path: str = check_type(args["src"], str) self.real_path: Path = fp self.fridge_meta: FridgeMetadata = meta self.mode = parse_mode(mode, directory=False) @@ -56,7 +61,7 @@ class FridgeCopy(Recipe): async def cook(self, k: Kitchen) -> None: data = await load_and_transform( - k, self.fridge_meta, self.real_path, self.get_host() + k, self.fridge_meta, self.real_path, self.recipe_context.sous ) dest_str = str(self.destination) chan = await k.start(WriteFile(dest_str, self.mode)) @@ -69,9 +74,7 @@ class FridgeCopy(Recipe): # hash_of_data = sha256_bytes(data) # k.get_dependency_tracker().register_remote_file(dest_str, hash_of_data) - await k.get_dependency_tracker().register_fridge_file( - self.fridge_path, self.real_path - ) + k.get_dependency_tracker().register_fridge_file(self.fridge_path) class Supermarket(Recipe): @@ -84,8 +87,8 @@ class Supermarket(Recipe): # dict of target path → future that will complete when it's downloaded in_progress: Dict[str, Future] = dict() - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.url = args.get("url") assert isinstance(self.url, str) @@ -101,7 +104,7 @@ class Supermarket(Recipe): else: self.destination = Path(args["dest"]).resolve() - self.owner = check_type(args.get("owner", self.get_user(head)), str) + self.owner = check_type(args.get("owner", self.recipe_context.user), str) self.group = check_type(args.get("group", self.owner), str) mode = args.get("mode", DEFAULT_MODE_FILE) @@ -115,7 +118,9 @@ class Supermarket(Recipe): async def cook(self, kitchen: "Kitchen"): # need to ensure we download only once, even in a race… - supermarket_path = Path(kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256) + supermarket_path = Path( + kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256 + ) if self.sha256 in Supermarket.in_progress: await Supermarket.in_progress[self.sha256] @@ -136,7 +141,7 @@ Downloaded by {self} self.url, str(supermarket_path), self.sha256, - note + note, ), ) diff --git a/scone/default/recipes/linux.py b/scone/default/recipes/linux.py index eb16a56..2f360cd 100644 --- a/scone/default/recipes/linux.py +++ b/scone/default/recipes/linux.py @@ -4,9 +4,9 @@ from typing import Optional from scone.default.steps import linux_steps from scone.default.utensils.linux_utensils import GetPasswdEntry -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type, check_type_opt logger = logging.getLogger(__name__) @@ -15,12 +15,10 @@ logger = logging.getLogger(__name__) class LinuxUser(Recipe): _NAME = "os-user" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) - if slug[0] == "@": - raise ValueError("os-user should be used like [os-user.username].") + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) - self.user_name = slug + self.user_name = check_type(args.get("name"), str) self.make_group = check_type(args.get("make_group", True), bool) self.make_home = check_type(args.get("make_home", True), bool) self.home: Optional[str] = check_type_opt(args.get("home"), str) @@ -62,3 +60,33 @@ class LinuxUser(Recipe): self.make_group, self.home, ) + + +class DeclareLinuxUser(Recipe): + _NAME = "declare-os-user" + + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) + + self.user_name = check_type(args.get("name"), str) + + def prepare(self, preparation: Preparation, head: "Head") -> None: + preparation.provides("os-user", self.user_name) + + async def cook(self, kitchen: Kitchen) -> None: + kitchen.get_dependency_tracker() + + +class DeclareLinuxGroup(Recipe): + _NAME = "declare-os-group" + + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) + + self.name = check_type(args.get("name"), str) + + def prepare(self, preparation: Preparation, head: "Head") -> None: + preparation.provides("os-group", self.name) + + async def cook(self, kitchen: Kitchen) -> None: + kitchen.get_dependency_tracker() diff --git a/scone/default/recipes/postgres.py b/scone/default/recipes/postgres.py index 4eff21e..4d40b75 100644 --- a/scone/default/recipes/postgres.py +++ b/scone/default/recipes/postgres.py @@ -1,17 +1,17 @@ from scone.default.utensils.db_utensils import PostgresTransaction -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type class PostgresDatabase(Recipe): _NAME = "pg-db" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) - self.database_name = slug + self.database_name = check_type(args.get("name"), str) self.owner = check_type(args.get("owner"), str) self.encoding = args.get("encoding", "utf8") self.collate = args.get("collate", "en_GB.utf8") @@ -56,10 +56,10 @@ class PostgresDatabase(Recipe): class PostgresUser(Recipe): _NAME = "pg-user" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) - self.user_name = slug + self.user_name = check_type(args.get("name"), str) self.password = check_type(args.get("password"), str) def prepare(self, preparation: Preparation, head: Head) -> None: diff --git a/scone/default/recipes/python.py b/scone/default/recipes/python.py index 1ed98a7..e1d7131 100644 --- a/scone/default/recipes/python.py +++ b/scone/default/recipes/python.py @@ -1,12 +1,11 @@ from pathlib import Path -from typing import Tuple, List +from typing import List, Tuple -from scone.default.recipes.apt import AptPackage from scone.default.steps.basic_steps import exec_no_fails from scone.default.steps.filesystem_steps import depend_remote_file -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type @@ -20,8 +19,8 @@ class PythonVenv(Recipe): _NAME = "python-venv" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) self.dir = check_type(args.get("dir"), str) self.interpreter = check_type(args.get("interpreter"), str) @@ -39,31 +38,29 @@ class PythonVenv(Recipe): def prepare(self, preparation: Preparation, head: Head): super().prepare(preparation, head) - preparation.needs("dir", str(Path(self.dir).parent)) + preparation.needs("directory", str(Path(self.dir).parent)) for name, flags in self.install: if "-r" in flags: preparation.needs("file", name) elif "git" in flags or "dir" in flags: - preparation.needs("dir", name) + preparation.needs("directory", name) final_script = str(Path(self.dir, "bin/python")) + preparation.provides("file", str(final_script)) if not self.no_apt_install: - preparation.subrecipe( - AptPackage( - self.get_host(), "@venv-apt", {"packages": ["python3-venv"]}, head - ) - ) - preparation.needs("apt-stage", "packages-installed") + # preparation.subrecipe( + # AptPackage(self.recipe_context, {"packages": ["python3-venv"]}) + # ) + # preparation.needs("apt-stage", "packages-installed") + preparation.needs("apt-package", "python3-venv") async def cook(self, kitchen: Kitchen): dt = kitchen.get_dependency_tracker() - await exec_no_fails( - kitchen, [self.interpreter, "-m", "venv", self.dir], "/" - ) + await exec_no_fails(kitchen, [self.interpreter, "-m", "venv", self.dir], "/") install_args = [] for name, flags in self.install: diff --git a/scone/default/recipes/systemd.py b/scone/default/recipes/systemd.py index d244974..e9b6b95 100644 --- a/scone/default/recipes/systemd.py +++ b/scone/default/recipes/systemd.py @@ -2,9 +2,9 @@ from typing import Dict from scone.default.recipes.filesystem import CommandOnChange from scone.default.utensils.basic_utensils import SimpleExec -from scone.head import Head, Recipe -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation +from scone.head.recipe import Recipe, RecipeContext from scone.head.utils import check_type, check_type_opt @@ -17,10 +17,11 @@ class SystemdUnit(Recipe): daemon_reloaders: Dict[str, CommandOnChange] = {} - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) - self.unit_name = slug if "." in slug else slug + ".service" + unit = check_type(args.get("unit"), str) + self.unit_name = unit if "." in unit else unit + ".service" self.at = check_type(args.get("at"), str) self.enabled = check_type_opt(args.get("enabled"), bool) self.restart_on = check_type_opt(args.get("restart_on"), list) @@ -32,44 +33,49 @@ class SystemdUnit(Recipe): if self.enabled is not None: enable_recipe = SystemdEnabled( - self.get_host(), - self.unit_name, - {"enabled": self.enabled, "at": self.at, ".user": "root"}, - head, + self.recipe_context, + { + "unit": self.unit_name, + "enabled": self.enabled, + "at": self.at, + ".user": "root", + }, + None, ) preparation.subrecipe(enable_recipe) preparation.needs("systemd-stage", "enabled") - daemon_reloader = SystemdUnit.daemon_reloaders.get(self.get_host(), None) + daemon_reloader = SystemdUnit.daemon_reloaders.get( + self.recipe_context.sous, None + ) if not daemon_reloader: # TODO this should be replaced with a dedicated command which provides # those units. daemon_reloader = CommandOnChange( - self.get_host(), - "systemd-internal", + self.recipe_context, { "purpose": "systemd.daemon_reload", "command": ["systemctl", "daemon-reload"], "files": [], ".user": "root", }, - head, + None, ) preparation.subrecipe(daemon_reloader) - file_list = getattr(daemon_reloader, "_args")["files"] + # file_list = getattr(daemon_reloader, "_args")["files"] + file_list = [] # TODO file_list.append(self.at) if self.restart_on: service_reloader = CommandOnChange( - self.get_host(), - "systemd-internal", + self.recipe_context, { "purpose": "systemd.unit_reload", "command": ["systemctl", "reload", self.unit_name], "files": self.restart_on + [self.at], ".user": "root", }, - head, + None, ) preparation.subrecipe(service_reloader) @@ -85,10 +91,11 @@ class SystemdEnabled(Recipe): _NAME = "systemd-enabled" - def __init__(self, host: str, slug: str, args: dict, head: Head): - super().__init__(host, slug, args, head) + def __init__(self, recipe_context: RecipeContext, args: dict, head): + super().__init__(recipe_context, args, head) - self.unit_name = slug if "." in slug else slug + ".service" + unit = check_type(args.get("unit"), str) + self.unit_name = unit if "." in unit else unit + ".service" self.at = check_type(args.get("at"), str) self.enabled = check_type_opt(args.get("enabled"), bool) diff --git a/scone/default/steps/basic_steps.py b/scone/default/steps/basic_steps.py index b128959..24b468a 100644 --- a/scone/default/steps/basic_steps.py +++ b/scone/default/steps/basic_steps.py @@ -2,9 +2,9 @@ from pathlib import PurePath from typing import List, Optional, Union from scone.default.utensils.basic_utensils import SimpleExec -from scone.head import Recipe from scone.head.exceptions import CookingError from scone.head.kitchen import Kitchen, current_recipe +from scone.head.recipe import Recipe class ExecutionFailure(CookingError): @@ -47,8 +47,8 @@ async def exec_no_fails( raise ExecutionFailure( args, working_dir, - recipe.get_host(), - recipe.get_user(kitchen.head), + recipe.recipe_context.sous, + recipe.recipe_context.user, result, ) else: diff --git a/scone/default/steps/fridge_steps.py b/scone/default/steps/fridge_steps.py index 0027c17..8190d3e 100644 --- a/scone/default/steps/fridge_steps.py +++ b/scone/default/steps/fridge_steps.py @@ -4,10 +4,9 @@ from typing import List, Optional, Tuple, Union from jinja2 import Template -from scone.head import Head +from scone.head.head import Head from scone.head.kitchen import Kitchen - SUPERMARKET_RELATIVE = ".scone-cache/supermarket" diff --git a/scone/head/__init__.py b/scone/head/__init__.py index 20bef22..e69de29 100644 --- a/scone/head/__init__.py +++ b/scone/head/__init__.py @@ -1,201 +0,0 @@ -import copy -import itertools -import logging -import re -import sys -from os import path -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, cast - -import toml -from nacl.encoding import URLSafeBase64Encoder - -from scone.common.loader import ClassLoader -from scone.common.misc import eprint -from scone.common.pools import Pools -from scone.head import menu_reader -from scone.head.menu_reader import HostMenu, Menu -from scone.head.recipe import Recipe, recipe_name_getter -from scone.head.secrets import SecretAccess -from scone.head.variables import Variables, merge_right_into_left_inplace - -logger = logging.getLogger(__name__) - - -class Head: - def __init__( - self, - directory: str, - recipe_loader: ClassLoader[Recipe], - menu: Menu, - sous: Dict[str, dict], - groups: Dict[str, List[str]], - secret_access: Optional[SecretAccess], - pools: Pools, - ): - self.directory = directory - self.recipe_loader = recipe_loader - self.menu = menu - self.souss = sous - self.groups = groups - self.secret_access = secret_access - self.variables: Dict[str, Variables] = dict() - self.pools = pools - - @staticmethod - def open(directory: str): - with open(path.join(directory, "scone.head.toml")) as head_toml: - head_data = toml.load(head_toml) - - secret_access: Optional[SecretAccess] = None - if "freezer" in head_data and "restaurant_id" in head_data["freezer"]: - secret_access = SecretAccess(head_data["freezer"]["restaurant_id"]) - secret_access.get_existing() - if not secret_access.key: - eprint("Failed to load freezer secret.") - sys.exit(12) - - recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"]) - - # load available recipes - recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter) - for recipe_root in recipe_module_roots: - recipe_loader.add_package_root(recipe_root) - - sous = head_data.get("sous", dict()) - groups = head_data.get("group", dict()) - groups["all"] = list(sous.keys()) - - # load the menu - menu = menu_reader.parse_toml_menu_descriptors(path.join(directory, "menu")) - - pools = Pools() - - head = Head(directory, recipe_loader, menu, sous, groups, secret_access, pools) - head._load_variables() - return head - - def _preload_variables(self, who_for: str) -> Tuple[dict, dict]: - out_frozen: Dict[str, Any] = {} - out_chilled: Dict[str, Any] = {} - vardir = Path(self.directory, "vars", who_for) - - logger.debug("preloading vars for %s in %s", who_for, str(vardir)) - - for file in vardir.glob("*.vf.toml"): - if not file.is_file(): - continue - with file.open() as var_file: - logger.debug("Opened %s for frozen vars", file) - frozen_vars = cast(Dict[Any, Any], toml.load(var_file)) - - merge_right_into_left_inplace(out_frozen, frozen_vars) - - for file in vardir.glob("*.v.toml"): - if not file.is_file(): - continue - with file.open() as var_file: - logger.debug("Opened %s for vars", file) - chilled_vars = cast(Dict[Any, Any], toml.load(var_file)) - - merge_right_into_left_inplace(out_chilled, chilled_vars) - - to_transform = [out_frozen] - while to_transform: - next_dict = to_transform.pop() - for k, v in next_dict.items(): - if isinstance(v, str): - b64_secret = re.sub(r"\s", "", v) - if not self.secret_access: - raise RuntimeError("Secret access disabled; cannot thaw.") - next_dict[k] = self.secret_access.decrypt_bytes( - b64_secret.encode(), encoder=URLSafeBase64Encoder - ).decode() - elif isinstance(v, dict): - to_transform.append(v) - else: - raise ValueError(f"Not permitted in frozen variables file: '{v}'.") - - return out_chilled, out_frozen - - def _load_variables(self): - preload: Dict[str, Tuple[dict, dict]] = dict() - for who_name in itertools.chain(self.souss, self.groups): - preload[who_name] = self._preload_variables(who_name) - - for sous_name in self.souss: - order = ["all"] - order += [ - group - for group, members in self.groups.items() - if sous_name in members and group != "all" - ] - order.append(sous_name) - - chilled: Dict[str, Any] = {} - frozen: Dict[str, Any] = {} - - for who_name in order: - in_chilled, in_frozen = preload[who_name] - merge_right_into_left_inplace(chilled, in_chilled) - merge_right_into_left_inplace(frozen, in_frozen) - - sous_vars = Variables() - sous_vars.load_plain(frozen) - sous_vars.load_vars_with_substitutions(chilled) - - self.variables[sous_name] = sous_vars - - def _construct_hostmenu_for( - self, hostmenu: HostMenu, host: str, recipe_list: List[Recipe], head: "Head" - ) -> None: - for recipe_id, dishes in hostmenu.dishes.items(): - recipe_cls = self.recipe_loader.get_class(recipe_id) - if not recipe_cls: - raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.") - for slug, args in dishes.items(): - args = copy.deepcopy(args) - self.variables[host].substitute_inplace_in_dict(args) - recipe = recipe_cls.from_menu(host, slug, args, head) - recipe_list.append(recipe) - - def construct_recipes(self): - recipes = {} - for sous in self.souss: - logger.debug("Constructing recipes for %s", sous) - sous_recipe_list: List[Recipe] = [] - - # construct recipes for it only - sous_hm = self.menu.hostmenus.get(sous) - if sous_hm is not None: - self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self) - - # construct recipes for it that are for groups it is in - for group, members in self.groups.items(): - if sous in members: - group_hm = self.menu.hostmenus.get(group) - if group_hm is not None: - self._construct_hostmenu_for( - group_hm, sous, sous_recipe_list, self - ) - recipes[sous] = sous_recipe_list - logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous) - return recipes - - def debug_info(self) -> str: - lines = [] - lines.append("Head Configuration") - lines.append(" Sous List") - for name, sous in self.souss.items(): - lines.append(f" - {name} = {sous}") - lines.append("") - lines.append(" Sous Groups") - for name, group in self.groups.items(): - lines.append(f" - {name} = {group}") - lines.append("") - lines += [" " + line for line in str(self.recipe_loader).splitlines()] - lines.append("") - lines += [" " + line for line in str(self.menu).splitlines()] - lines.append("") - - return "\n".join(lines) diff --git a/scone/head/cli/__init__.py b/scone/head/cli/__init__.py index 57fec96..f9c8784 100644 --- a/scone/head/cli/__init__.py +++ b/scone/head/cli/__init__.py @@ -8,10 +8,10 @@ from pathlib import Path from scone.common.misc import eprint from scone.common.pools import Pools -from scone.head import Head -from scone.head.dependency_tracking import DependencyCache, run_dep_checks -from scone.head.kitchen import Kitchen -from scone.head.recipe import Preparation, Recipe +from scone.head import dot_emitter +from scone.head.dependency_tracking import DependencyCache +from scone.head.head import Head +from scone.head.kitchen import Kitchen, Preparation def cli() -> None: @@ -64,43 +64,38 @@ async def cli_async() -> int: eprint(f"Selected the following souss: {', '.join(hosts)}") - recipes_by_sous = head.construct_recipes() - - recipes_to_do = [] - for sous in hosts: - recipes_to_do += recipes_by_sous.get(sous, []) - - eprint(f"Preparing {len(recipes_to_do)} recipes…") - prepare = Preparation(recipes_to_do) + eprint("Preparing recipes…") + prepare = Preparation(head) start_ts = time.monotonic() - order = prepare.prepare(head) - notifying_provides = prepare.notifying_provides + prepare.prepare_all() del prepare end_ts = time.monotonic() eprint(f"Preparation completed in {end_ts - start_ts:.3f} s.") - eprint(f"{len(order)} courses planned.") + # eprint(f"{len(order)} courses planned.") + + dot_emitter.emit_dot(head.dag, Path(cdir, "dag.0.dot")) dep_cache = await DependencyCache.open( os.path.join(head.directory, "depcache.sqlite3") ) - eprint("Checking dependency cache…") - start_ts = time.monotonic() - depchecks = await run_dep_checks(head, dep_cache, order) - end_ts = time.monotonic() - eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts - - for epoch, items in enumerate(order): - print(f"----- Course {epoch} -----") - - for item in items: - if isinstance(item, Recipe): - state = depchecks[item].label.name - print(f" > recipe ({state}) {item}") - elif isinstance(item, tuple): - kind, ident, extra = item - print(f" - we now have {kind} {ident} {dict(extra)}") + # eprint("Checking dependency cache…") + # start_ts = time.monotonic() + # depchecks = await run_dep_checks(head, dep_cache, order) + # end_ts = time.monotonic() + # eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts + # + # for epoch, items in enumerate(order): + # print(f"----- Course {epoch} -----") + # + # for item in items: + # if isinstance(item, Recipe): + # state = depchecks[item].label.name + # print(f" > recipe ({state}) {item}") + # elif isinstance(item, tuple): + # kind, ident, extra = item + # print(f" - we now have {kind} {ident} {dict(extra)}") eprint("Ready to cook? [y/N]: ", end="") if argp.yes: @@ -110,16 +105,21 @@ async def cli_async() -> int: eprint("Stopping.") return 101 - kitchen = Kitchen(head, dep_cache, notifying_provides) + kitchen = Kitchen(head, dep_cache) - for epoch, epoch_items in enumerate(order): - print(f"Cooking Course {epoch} of {len(order)}") - await kitchen.run_epoch( - epoch_items, depchecks, concurrency_limit_per_host=8 - ) + # for epoch, epoch_items in enumerate(order): + # print(f"Cooking Course {epoch} of {len(order)}") + # await kitchen.run_epoch( + # epoch_items, depchecks, concurrency_limit_per_host=8 + # ) + # + # for sous in hosts: TODO this is not definitely safe + # await dep_cache.sweep_old(sous) - for sous in hosts: - await dep_cache.sweep_old(sous) + try: + await kitchen.cook_all() + finally: + dot_emitter.emit_dot(head.dag, Path(cdir, "dag.9.dot")) return 0 finally: diff --git a/scone/head/dag.py b/scone/head/dag.py new file mode 100644 index 0000000..4bed73b --- /dev/null +++ b/scone/head/dag.py @@ -0,0 +1,193 @@ +from collections import defaultdict +from enum import Enum +from typing import Dict, Optional, Set, Union + +import attr +from frozendict import frozendict + +from scone.head.recipe import Recipe + + +class RecipeState(Enum): + # Just loaded from menu, or otherwise created + LOADED = 0 + + # Has been prepared — we know its dependencies for this run + PREPARED = 1 + + # This recipe needs to be cooked, but may be blocked by dependencies + PENDING = 2 + + # This recipe is not blocked by any further + COOKABLE = 3 + + # This recipe is being cooked + BEING_COOKED = 4 + + # This recipe has been cooked! + COOKED = 5 + + # This recipe has not been cooked because it didn't need to be. + SKIPPED = 10 + + @staticmethod + def is_completed(state): + return state in (RecipeState.COOKED, RecipeState.SKIPPED) + + +@attr.s(auto_attribs=True) +class RecipeMeta: + """ + State of the recipe. + """ + + state: RecipeState = RecipeState.LOADED + + """ + Uncompleted incoming edge count. + """ + incoming_uncompleted: int = 0 + + +@attr.s(auto_attribs=True, frozen=True) +class Resource: + """ + Resource kind. + """ + + kind: str + + """ + Resource ID + """ + id: str + + """ + Resource sous, or None if it's on the head + """ + sous: Optional[str] + + """ + Optional dict of extra parameters needed to disambiguate the resource, + though should only be used where necessary and sensible to do so. + """ + # extra_params: Optional[frozendict[str, str]] = None + extra_params: Optional[frozendict] = None + + def __str__(self) -> str: + extra_str = "" if not self.extra_params else f" {self.extra_params!r}" + sous_str = "" if not self.sous else f" on {self.sous}" + return f"{self.kind}({self.id}){extra_str}{sous_str}" + + +@attr.s(auto_attribs=True) +class ResourceMeta: + """ + Whether the resource is completed or not. + A resource becomes completed when all its incoming edges are completed, + or it has no incoming edges and is not a hard need. + """ + + completed: bool = False + + """ + Uncompleted incoming edge count. + """ + incoming_uncompleted: int = 0 + + """ + Whether the resource is considered a hard need. + A resource is a hard need when we cannot proceed without something + providing it. + """ + hard_need: bool = False + + +Vertex = Union["Recipe", Resource] + + +class RecipeDag: + def __init__(self): + self.vertices: Set[Vertex] = set() + # edges go from A -> B where B needs A to run. + self.edges: Dict[Vertex, Set[Vertex]] = defaultdict(set) + self.reverse_edges: Dict[Vertex, Set[Vertex]] = defaultdict(set) + self.recipe_meta: Dict[Recipe, RecipeMeta] = dict() + self.resource_meta: Dict[Resource, ResourceMeta] = dict() + + self.resource_time: Dict[Resource, int] = dict() + + def add(self, vertex: Vertex): + self.vertices.add(vertex) + if isinstance(vertex, Recipe): + self.recipe_meta[vertex] = RecipeMeta() + elif isinstance(vertex, Resource): + self.resource_meta[vertex] = ResourceMeta() + + def needs( + self, needer: "Recipe", resource: Resource, soft_wants: bool = False + ) -> None: + if needer not in self.vertices: + raise ValueError(f"Needer {needer} not in vertices!") + + if resource not in self.vertices: + self.add(resource) + + if needer in self.edges[resource]: + return + + self.edges[resource].add(needer) + self.reverse_edges[needer].add(resource) + + needer_meta = self.recipe_meta[needer] + resource_meta = self.resource_meta[resource] + + if not soft_wants: + resource_meta.hard_need = True + + if not resource_meta.completed: + needer_meta.incoming_uncompleted += 1 + + def provides(self, provider: "Recipe", resource: Resource) -> None: + if provider not in self.vertices: + raise ValueError(f"Provider {provider} not in vertices!") + + if resource not in self.vertices: + self.add(resource) + + if resource in self.edges[provider]: + return + + self.edges[provider].add(resource) + self.reverse_edges[resource].add(provider) + + provider_meta = self.recipe_meta[provider] + resource_meta = self.resource_meta[resource] + + if not RecipeState.is_completed(provider_meta.state): + resource_meta.incoming_uncompleted += 1 + resource_meta.completed = False + else: + if resource_meta.incoming_uncompleted == 0: + resource_meta.completed = True + + def add_ordering(self, before: "Recipe", after: "Recipe") -> None: + if before not in self.vertices: + raise ValueError(f"Before {before} not in vertices!") + + if after not in self.vertices: + raise ValueError(f"After {after} not in vertices!") + + after_meta = self.recipe_meta[after] + before_meta = self.recipe_meta[before] + + if after in self.edges[before]: + return + + self.edges[before].add(after) + self.reverse_edges[after].add(before) + + if not RecipeState.is_completed(before_meta.state): + after_meta.incoming_uncompleted += 1 + # TODO if after_meta.state == + # TODO else ... diff --git a/scone/head/dependency_tracking.py b/scone/head/dependency_tracking.py index d2fe5df..5ed796e 100644 --- a/scone/head/dependency_tracking.py +++ b/scone/head/dependency_tracking.py @@ -1,11 +1,8 @@ -import asyncio import json import logging import time -from asyncio import Queue -from enum import Enum from hashlib import sha256 -from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union import aiosqlite import attr @@ -13,10 +10,12 @@ import canonicaljson import cattr from aiosqlite import Connection -from scone.common.misc import sha256_file -from scone.common.pools import Pools -from scone.head import Head, Recipe, Variables, recipe_name_getter -from scone.head.recipe import DepEle +from scone.head.dag import Resource +from scone.head.recipe import recipe_name_getter + +if TYPE_CHECKING: + from scone.head.dag import RecipeDag + from scone.head.recipe import Recipe canonicaljson.set_json_library(json) logger = logging.getLogger(__name__) @@ -24,9 +23,6 @@ logger = logging.getLogger(__name__) # TODO(security, low): how to prevent passwords being recovered from the # paramhashes in a dependency store? -# TODO(correctness, perf): recipes with @src@0 slugs should not be registered -# to a slug. - def _canonicalise_dict(input: Dict[str, Any]) -> Dict[str, Any]: output: Dict[str, Any] = {} @@ -48,139 +44,104 @@ def hash_dict(value: dict) -> str: ).hexdigest() -def paramhash_recipe(recipe: Recipe) -> str: - args = getattr(recipe, "_args").copy() - del args[".source"] - return hash_dict(args) +def paramhash_recipe(recipe: "Recipe") -> str: + return hash_dict( + { + "args": recipe.arguments, + "sous": recipe.recipe_context.sous, + "user": recipe.recipe_context.user, + } + ) @attr.s(auto_attribs=True) class DependencyBook: - var_names: List[str] - var_hash: str - fridge_hashes: Dict[str, str] - recipe_revisions: Dict[str, int] - dyn_sous_file_hashes: Dict[str, str] + provided: Dict[Resource, int] = dict() + watching: Dict[Resource, int] = dict() + last_changed: int = 0 + cache_data: Dict[str, Any] = dict() + ignored: bool = False - async def can_skip_static(self, head: Head, recipe: Recipe) -> bool: - from scone.default.steps.fridge_steps import search_in_fridge + # TODO(performance, feature): track more in-depth details, perhaps as a + # per-resource cache thing, so that we can track the info needed to know + # if it changed...? - # start with variables - sous_vars = head.variables[recipe.get_host()] - var_comp = dict() - for var_name in self.var_names: - try: - var_comp[var_name] = sous_vars.get_dotted(var_name) - except KeyError: - return False + def _unstructure(self) -> dict: + return { + "provided": cattr.unstructure(tuple(self.provided.items())), + "watching": cattr.unstructure(tuple(self.watching.items())), + "last_changed": self.last_changed, + "cache_data": self.cache_data, + "ignored": self.ignored, + } - if hash_dict(var_comp) != self.var_hash: - return False + @staticmethod + def _structure(dictionary: dict) -> "DependencyBook": + provided = {cattr.structure(k, Resource): v for k, v in dictionary["provided"]} + watching = {cattr.structure(k, Resource): v for k, v in dictionary["watching"]} - # now we have to check files in the fridge - for fridge_name, expected_hash in self.fridge_hashes.items(): - real_pathstr = search_in_fridge(head, fridge_name) - if not real_pathstr: - # vanished locally; that counts as a change - return False - real_hash = await asyncio.get_running_loop().run_in_executor( - head.pools.threaded, sha256_file, real_pathstr - ) - if real_hash != expected_hash: - return False + return DependencyBook( + provided=provided, + watching=watching, + last_changed=dictionary["last_changed"], + cache_data=dictionary["cache_data"], + ignored=dictionary["ignored"], + ) - return True - def has_dynamic(self) -> bool: - return len(self.dyn_sous_file_hashes) > 0 +cattr.global_converter.register_unstructure_hook( + DependencyBook, DependencyBook._unstructure +) +cattr.global_converter.register_structure_hook( + DependencyBook, DependencyBook._structure +) class DependencyTracker: - """ - Tracks the dependencies of a task and then inserts a row as needed. - """ + def __init__(self, book: DependencyBook, dag: "RecipeDag"): + self.book: DependencyBook = book + self._dag: RecipeDag = dag + self._time: int = int(time.time() * 1000) - def __init__(self, pools: Pools): - self._vars: Dict[str, Any] = {} - self._fridge: Dict[str, str] = {} - self._recipe_revisions: Dict[str, int] = {} - self._dyn_sous_files: Dict[str, str] = {} - self._ignored = False - self._pools = pools + def watch(self, resource: Resource) -> None: + # XXX self.book.watching[resource] = self._dag.resource_time[resource] + self.book.watching[resource] = -42 - def ignore(self): - """ - Call when dependency tracking is not desired (or not advanced enough to - be useful.) - """ - self._ignored = True + def provide(self, resource: Resource, time: Optional[int] = None) -> None: + if time is None: + time = self._time + self._dag.resource_time[resource] = time - async def register_fridge_file(self, fridge_path: str, real_path: str): - if fridge_path not in self._fridge: - f_hash = await asyncio.get_running_loop().run_in_executor( - self._pools.threaded, sha256_file, real_path - ) - self._fridge[fridge_path] = f_hash - - def register_recipe(self, recipe: Recipe): - cls = recipe.__class__ - rec_name = recipe_name_getter(cls) - if not rec_name: - return - self._recipe_revisions[rec_name] = getattr(cls, "_REVISION", None) + def ignore(self) -> None: + self.book.ignored = True def register_variable(self, variable: str, value: Union[dict, str, int]): - self._vars[variable] = value + # self._vars[variable] = value + raise NotImplementedError("time") - def register_remote_file(self, file: str, file_hash: str): - self._dyn_sous_files[file] = file_hash + def register_fridge_file(self, path: str): + # TODO this is not complete + fridge_res = Resource("fridge", path, None) + self.watch(fridge_res) - def make_depbook(self) -> Optional[DependencyBook]: - if self._ignored: - return None - dep_book = DependencyBook( - list(self._vars.keys()), - hash_dict(self._vars), - self._fridge.copy(), - self._recipe_revisions, - self._dyn_sous_files, - ) - return dep_book + def register_remote_file(self, path: str, sous: str): + # TODO this is not complete + file_res = Resource("file", path, sous=sous) + self.watch(file_res) - def get_j2_compatible_dep_var_proxies( - self, variables: Variables - ) -> Dict[str, "DependencyVarProxy"]: - result = {} - - for key, vars in variables.toplevel().items(): - result[key] = DependencyVarProxy(self, vars, key + ".") - - return result - - -class DependencyVarProxy: - """ - Provides convenient access to variables that also properly tracks - dependencies. - """ - - def __init__( - self, dependency_tracker: DependencyTracker, variables: dict, prefix: str = "" - ): - self._dvp_dt: DependencyTracker = dependency_tracker - self._dvp_prefix = prefix - self._dvp_vars = variables - - def __getattr__(self, key: str): - fully_qualified_varname = self._dvp_prefix + key - value = self._dvp_vars.get(key, ...) - if value is ...: - raise KeyError(f"Variable does not exist: {fully_qualified_varname}") - elif isinstance(value, dict): - return DependencyVarProxy(self._dvp_dt, value, key + ".") - else: - self._dvp_dt.register_variable(fully_qualified_varname, value) - return value + # def get_j2_compatible_dep_var_proxies( + # self, variables: Variables + # ) -> Dict[str, "DependencyVarProxy"]: + # # XXX BROKEN does not work for overrides + # result = {} + # + # if len("1"): + # raise NotImplementedError("BROKEN") + # + # for key, vars in variables.toplevel().items(): + # result[key] = DependencyVarProxy(self, vars, key + ".") + # + # return result class DependencyCache: @@ -195,16 +156,14 @@ class DependencyCache: await dc.db.executescript( """ CREATE TABLE IF NOT EXISTS dishcache ( - source_file TEXT, - host TEXT, - recipe_id TEXT, - slug TEXT, + -- source_file TEXT, + recipe_kind TEXT, paramhash TEXT, dep_book TEXT, ts INT, - PRIMARY KEY (source_file, host, recipe_id, slug, paramhash) + PRIMARY KEY (recipe_kind, paramhash) ); - CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts); + -- CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts); """ ) await dc.db.commit() @@ -223,25 +182,16 @@ class DependencyCache: ) await self.db.commit() - async def inquire(self, recipe: Recipe) -> Optional[Tuple[int, DependencyBook]]: + async def inquire(self, recipe: "Recipe") -> Optional[Tuple[int, DependencyBook]]: paramhash = paramhash_recipe(recipe) rows = await self.db.execute_fetchall( """ SELECT rowid, dep_book FROM dishcache - WHERE source_file = ? - AND host = ? - AND recipe_id = ? + WHERE recipe_kind = ? AND paramhash = ? - AND slug = ? LIMIT 1 """, - ( - recipe._args[".source"][0], - recipe.get_host(), - recipe_name_getter(recipe.__class__), - paramhash, - recipe._slug, - ), + (recipe_name_getter(recipe.__class__), paramhash,), ) rows = list(rows) if not rows: @@ -259,23 +209,20 @@ class DependencyCache: return rowid, dep_book - async def register(self, recipe: Recipe, dep_book: DependencyBook): + async def register(self, recipe: "Recipe", dep_book: DependencyBook): paramhash = paramhash_recipe(recipe) await self.db.execute( """ INSERT INTO dishcache - (source_file, host, recipe_id, slug, paramhash, dep_book, ts) - VALUES (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (source_file, host, recipe_id, paramhash, slug) + (recipe_kind, paramhash, dep_book, ts) + VALUES (?, ?, ?, ?) + ON CONFLICT (recipe_kind, paramhash) DO UPDATE SET dep_book = excluded.dep_book, ts = excluded.ts """, ( - recipe._args[".source"][0], - recipe.get_host(), recipe_name_getter(recipe.__class__), - recipe._slug, paramhash, canonicaljson.encode_canonical_json(cattr.unstructure(dep_book)), self.time, @@ -292,75 +239,3 @@ class DependencyCache: (self.time, rowid), ) await self.db.commit() - - -class CheckOutcomeLabel(Enum): - # Not in dependency cache, so must run. - NOT_CACHED = 0 - - # Dependency cache suggests we must rerun - MUST_REDO = 1 - - # Dependency cache suggests we are fine if dynamic dependencies haven't - # changed - CHECK_DYNAMIC = 2 - - # Dependency cache says we can skip; there are no dynamic dependencies - SAFE_TO_SKIP = 3 - - -DepCheckOutcome = NamedTuple( - "DepCheckOutcome", - (("label", CheckOutcomeLabel), ("book", Optional[DependencyBook])), -) - - -async def run_dep_checks( - head: Head, dep_cache: DependencyCache, order: List[List[DepEle]] -) -> Dict[Recipe, DepCheckOutcome]: - queue: Queue[Optional[Recipe]] = Queue(32) - outcomes = {} - - async def consumer(): - while True: - recipe = await queue.get() - if not recipe: - break - t = await dep_cache.inquire(recipe) - if t: - # we need to check if dependencies have changed… - rowid, dep_book = t - if await dep_book.can_skip_static(head, recipe): - # we will renew either way - await dep_cache.renew(rowid) - if dep_book.has_dynamic(): - # has dynamic dependencies - outcomes[recipe] = DepCheckOutcome( - CheckOutcomeLabel.CHECK_DYNAMIC, dep_book - ) - else: - # can skip! - outcomes[recipe] = DepCheckOutcome( - CheckOutcomeLabel.SAFE_TO_SKIP, None - ) - else: - outcomes[recipe] = DepCheckOutcome( - CheckOutcomeLabel.MUST_REDO, None - ) - else: - outcomes[recipe] = DepCheckOutcome(CheckOutcomeLabel.NOT_CACHED, None) - queue.task_done() - - async def producer(): - for course in order: - for recipe in course: - if isinstance(recipe, Recipe): - await queue.put(recipe) - await queue.join() - for worker in consumers: - await queue.put(None) - - consumers = [asyncio.create_task(consumer()) for _ in range(8)] - await asyncio.gather(*consumers, producer(), return_exceptions=False) - - return outcomes diff --git a/scone/head/dot_emitter.py b/scone/head/dot_emitter.py new file mode 100644 index 0000000..59d96ab --- /dev/null +++ b/scone/head/dot_emitter.py @@ -0,0 +1,57 @@ +from pathlib import Path +from typing import Dict + +from scone.head.dag import RecipeDag, RecipeState, Resource, Vertex +from scone.head.recipe import Recipe, recipe_name_getter + +state_to_colour = { + RecipeState.LOADED: "#000000", + RecipeState.PREPARED: "azure", + RecipeState.PENDING: "pink", + RecipeState.COOKABLE: "gold", + RecipeState.COOKED: "darkolivegreen1", + RecipeState.SKIPPED: "cadetblue1", + RecipeState.BEING_COOKED: "darkorange1", +} + + +def emit_dot(dag: RecipeDag, path_out: Path) -> None: + with open(path_out, "w") as fout: + fout.write("digraph recipedag {\n") + + ids: Dict[Vertex, str] = dict() + + fout.write("\t// Vertices\n") + + for idx, vertex in enumerate(dag.vertices): + vertex_id = f"v{idx}" + ids[vertex] = vertex_id + if isinstance(vertex, Recipe): + rec_meta = dag.recipe_meta[vertex] + label = ( + f"{recipe_name_getter(vertex.__class__)}" + f" [{rec_meta.incoming_uncompleted}]" + ) + colour = state_to_colour[rec_meta.state] + fout.write( + f'\t{vertex_id} [shape=box, label="{label}",' + f" style=filled, fillcolor={colour}];\n" + ) + elif isinstance(vertex, Resource): + label = str(vertex).replace("\\", "\\\\").replace('"', '\\"') + res_meta = dag.resource_meta[vertex] + colour = "darkolivegreen1" if res_meta.completed else "pink" + fout.write( + f'\t{vertex_id} [label="{label}",' + f" style=filled, fillcolor={colour}];\n" + ) + else: + raise ValueError(f"? vertex {vertex!r}") + + fout.write("\n\t// Edges\n") + + for from_vert, edges in dag.edges.items(): + for to_vert in edges: + fout.write(f"\t{ids[from_vert]} -> {ids[to_vert]};\n") + + fout.write("}\n") diff --git a/scone/head/grammar/scoml.tx b/scone/head/grammar/scoml.tx new file mode 100644 index 0000000..8482680 --- /dev/null +++ b/scone/head/grammar/scoml.tx @@ -0,0 +1,163 @@ +// root +Unit: + Block +; + +Comment: + /#.*$/ +; + +Block: + directives*=Directive + '' + recipes+=RecipeOrSubBlock +; + +RecipeOrSubBlock: + Recipe | SubBlock +; + + +SubBlock[ws=' \t']: + unique_id=ID '{' human=/.*$/ /\n/+ + block=Block + '}' +; + + +Directive: + UserDirective | SousDirective | ForDirective | ImportDirective | + RecipeEdgeDirective | ResourceEdgeDirective | ListenEdgeDirective +; + +UserDirective[ws=' \t']: + '@user' '=' user=ID /\n/+ +; + +SousDirective[ws=' \t']: + '@sous' '=' sous=ID /\n/+ +; + +ImportDirective[ws=' \t']: + '@import' importee=ID /\n/+ +; + + +ForDirective[ws=' \t']: + '@for' loop_variable=DottedIdString 'in' + ( + collection=DottedIdString /\n/+ + | + ':' /\n/ + list=NaturalList + ) +; + +ResourceEdgeDirectiveKind: + '@needs' | '@wants' | '@provides' +; + +ResourceEdgeDirective[ws=' \t']: + kind=ResourceEdgeDirectiveKind + resource=Resource +; + +RecipeEdgeDirectiveKind: + '@after' | '@before' +; + +RecipeEdgeDirective[ws=' \t']: + kind=RecipeEdgeDirectiveKind + ':' id=ID + // TODO 'on other sous' ? +; + +ListenEdgeDirectiveKind: + '@when' | '@only when' +; + +ListenEdgeDirective[ws=' \t']: + kind=ListenEdgeDirectiveKind + (recipe_id=ID | resource=Resource) + 'changes' +; + + +Resource: + type=ID '(' (primary=UnquotedString | primary=QuotedString) ')' + (extra_params=BraceDict)? + ('on' sous=ID)? +; + + +NaturalList: + elements+=NaturalListElement +; + +NaturalListElement[ws=' \t']: + //'-' item=KeyExpr /\n/+ + '-' KeyExpr /\n/+ +; + + +Recipe[ws=' \t']: + '[[' kind=DottedIdString (':' unique_id=DottedIdString)? ']]' human=/.*$/ /\n/+ + directives*=Directive + args*=RecipeArgument + /\n*/ +; + +RecipeArgument[ws=' \t']: + name=ID + ( + '=' value=ValueExpr /\n/+ + | + ':' /\n/ + value=NaturalList + ) +; + + +KeyExpr: + QuotedString | UnquotedString | Integer +; + +ValueExpr: + QuotedString | Integer | Boolean | BracketList | BraceDict | UnquotedString +; + +QuotedString: + value=STRING +; + +UnquotedString: + value=/[^\s\n,"()0-9]([^\n,"()]*[^\s\n,"()])?/ +; + +DottedIdString: + /[a-zA-Z_-][a-zA-Z0-9_\.-]*/ +; + +Integer: + value=INT +; + +Boolean: + value=BOOL +; + +BracketList[ws=' \t\n']: + '[' + items*=ValueExpr[','] + ']' +; + +BraceDict[ws=' \t']: + '{' + pairs*=DictPair[','] + '}' +; + +DictPair: + (key=KeyExpr) '=' (value=ValueExpr) +; diff --git a/scone/head/head.py b/scone/head/head.py new file mode 100644 index 0000000..b465791 --- /dev/null +++ b/scone/head/head.py @@ -0,0 +1,209 @@ +import itertools +import logging +import re +import sys +from os import path +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast + +import toml +from nacl.encoding import URLSafeBase64Encoder + +from scone.common.loader import ClassLoader +from scone.common.misc import eprint +from scone.common.pools import Pools +from scone.head.dag import RecipeDag +from scone.head.menu_reader import MenuLoader +from scone.head.recipe import Recipe, recipe_name_getter +from scone.head.secrets import SecretAccess +from scone.head.variables import Variables, merge_right_into_left_inplace + +logger = logging.getLogger(__name__) + + +class Head: + def __init__( + self, + directory: str, + recipe_loader: ClassLoader[Recipe], + sous: Dict[str, dict], + groups: Dict[str, List[str]], + secret_access: Optional[SecretAccess], + pools: Pools, + ): + self.directory = directory + self.recipe_loader = recipe_loader + self.dag = RecipeDag() + self.souss = sous + self.groups = groups + self.secret_access = secret_access + self.variables: Dict[str, Variables] = dict() + self.pools = pools + + @staticmethod + def open(directory: str): + with open(path.join(directory, "scone.head.toml")) as head_toml: + head_data = toml.load(head_toml) + + secret_access: Optional[SecretAccess] = None + if "freezer" in head_data and "restaurant_id" in head_data["freezer"]: + secret_access = SecretAccess(head_data["freezer"]["restaurant_id"]) + secret_access.get_existing() + if not secret_access.key: + eprint("Failed to load freezer secret.") + sys.exit(12) + + recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"]) + + # load available recipes + recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter) + for recipe_root in recipe_module_roots: + recipe_loader.add_package_root(recipe_root) + + sous = head_data.get("sous", dict()) + groups = head_data.get("group", dict()) + groups["all"] = list(sous.keys()) + + pools = Pools() + + head = Head(directory, recipe_loader, sous, groups, secret_access, pools) + head._load_variables() + head._load_menus() + return head + + def _preload_variables(self, who_for: str) -> Tuple[dict, dict]: + out_frozen: Dict[str, Any] = {} + out_chilled: Dict[str, Any] = {} + vardir = Path(self.directory, "vars", who_for) + + logger.debug("preloading vars for %s in %s", who_for, str(vardir)) + + for file in vardir.glob("*.vf.toml"): + if not file.is_file(): + continue + with file.open() as var_file: + logger.debug("Opened %s for frozen vars", file) + frozen_vars = cast(Dict[Any, Any], toml.load(var_file)) + + merge_right_into_left_inplace(out_frozen, frozen_vars) + + for file in vardir.glob("*.v.toml"): + if not file.is_file(): + continue + with file.open() as var_file: + logger.debug("Opened %s for vars", file) + chilled_vars = cast(Dict[Any, Any], toml.load(var_file)) + + merge_right_into_left_inplace(out_chilled, chilled_vars) + + to_transform = [out_frozen] + while to_transform: + next_dict = to_transform.pop() + for k, v in next_dict.items(): + if isinstance(v, str): + b64_secret = re.sub(r"\s", "", v) + if not self.secret_access: + raise RuntimeError("Secret access disabled; cannot thaw.") + next_dict[k] = self.secret_access.decrypt_bytes( + b64_secret.encode(), encoder=URLSafeBase64Encoder + ).decode() + elif isinstance(v, dict): + to_transform.append(v) + else: + raise ValueError(f"Not permitted in frozen variables file: '{v}'.") + + return out_chilled, out_frozen + + def _load_variables(self): + preload: Dict[str, Tuple[dict, dict]] = dict() + for who_name in itertools.chain(self.souss, self.groups): + preload[who_name] = self._preload_variables(who_name) + + for sous_name in self.souss: + order = ["all"] + order += [ + group + for group, members in self.groups.items() + if sous_name in members and group != "all" + ] + order.append(sous_name) + + chilled: Dict[str, Any] = {} + frozen: Dict[str, Any] = {} + + for who_name in order: + in_chilled, in_frozen = preload[who_name] + merge_right_into_left_inplace(chilled, in_chilled) + merge_right_into_left_inplace(frozen, in_frozen) + + sous_vars = Variables(None) + sous_vars.load_plain(frozen) + sous_vars.load_vars_with_substitutions(chilled) + + self.variables[sous_name] = sous_vars + + def _load_menus(self): + loader = MenuLoader(Path(self.directory, "menu"), self) + loader.load_menus_in_dir() + loader.dagify_all() + + # TODO remove + # def _construct_hostmenu_for( + # self, hostmenu: "HostMenu", host: str, recipe_list: List[Recipe], head: "Head" + # ) -> None: + # for recipe_id, dishes in hostmenu.dishes.items(): + # recipe_cls = self.recipe_loader.get_class(recipe_id) + # if not recipe_cls: + # raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.") + # for slug, args in dishes.items(): + # args = copy.deepcopy(args) + # self.variables[host].substitute_inplace_in_dict(args) + # recipe = recipe_cls.from_menu(host, slug, args, head) + # recipe_list.append(recipe) + # + # def construct_recipes(self): + # recipes = {} + # for sous in self.souss: + # logger.debug("Constructing recipes for %s", sous) + # sous_recipe_list: List[Recipe] = [] + # + # # construct recipes for it only + # sous_hm = self.menu.hostmenus.get(sous) + # if sous_hm is not None: + # self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self) + # + # # construct recipes for it that are for groups it is in + # for group, members in self.groups.items(): + # if sous in members: + # group_hm = self.menu.hostmenus.get(group) + # if group_hm is not None: + # self._construct_hostmenu_for( + # group_hm, sous, sous_recipe_list, self + # ) + # recipes[sous] = sous_recipe_list + # logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous) + # return recipes + + def debug_info(self) -> str: + lines = [] + lines.append("Head Configuration") + lines.append(" Sous List") + for name, sous in self.souss.items(): + lines.append(f" - {name} = {sous}") + lines.append("") + lines.append(" Sous Groups") + for name, group in self.groups.items(): + lines.append(f" - {name} = {group}") + # lines.append("") + # lines += [" " + line for line in str(self.recipe_loader).splitlines()] + # lines.append("") + # lines += [" " + line for line in str(self.menu).splitlines()] + # lines.append("") + + return "\n".join(lines) + + def get_souss_for_hostspec(self, hostspec: str) -> Iterable[str]: + if hostspec in self.souss: + return (hostspec,) + else: + return self.groups[hostspec] diff --git a/scone/head/kitchen.py b/scone/head/kitchen.py index b814527..87452b2 100644 --- a/scone/head/kitchen.py +++ b/scone/head/kitchen.py @@ -1,27 +1,27 @@ import asyncio import logging -from asyncio import Future -from collections import defaultdict +from asyncio import Future, Queue +from collections import defaultdict, deque from contextvars import ContextVar -from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, TypeVar +from typing import Any, Deque, Dict, Optional, Tuple, Type, TypeVar -import attr import cattr +from frozendict import frozendict from scone.common.chanpro import Channel, ChanProHead -from scone.default.utensils.dynamic_dependencies import CanSkipDynamic -from scone.head import Head, Recipe, sshconn +from scone.common.misc import eprint +from scone.head import sshconn +from scone.head.dag import RecipeMeta, RecipeState, Resource, Vertex from scone.head.dependency_tracking import ( - CheckOutcomeLabel, - DepCheckOutcome, + DependencyBook, DependencyCache, DependencyTracker, ) -from scone.head.recipe import DepEle, DependencySpec +from scone.head.head import Head +from scone.head.recipe import Recipe from scone.sous import utensil_namer from scone.sous.utensils import Utensil - logger = logging.getLogger(__name__) current_recipe: ContextVar[Recipe] = ContextVar("current_recipe") @@ -29,21 +29,95 @@ current_recipe: ContextVar[Recipe] = ContextVar("current_recipe") A = TypeVar("A") +class Preparation: + def __init__(self, head: Head): + self.dag = head.dag + self.head = head + self._queue: Deque[Tuple[Recipe, RecipeMeta]] = deque() + self._current_recipe: Optional[Recipe] = None + + def needs( + self, + requirement: str, + identifier: str, + hard: bool = True, + sous: Optional[str] = "(self)", + **extra_identifiers: Any, + ) -> None: + assert self._current_recipe is not None + + if sous == "(self)": + sous = self._current_recipe.recipe_context.sous + + resource = Resource( + requirement, identifier, sous, frozendict(extra_identifiers) + ) + + self.dag.needs(self._current_recipe, resource, not hard) + + def wants(self, requirement: str, identifier: str, **extra_identifiers: Any): + return self.needs(requirement, identifier, hard=False, **extra_identifiers) + + def provides( + self, + requirement: str, + identifier: str, + sous: Optional[str] = "(self)", + **extra_identifiers: Any, + ) -> None: + assert self._current_recipe is not None + + if sous == "(self)": + sous = self._current_recipe.recipe_context.sous + + resource = Resource( + requirement, identifier, sous, frozendict(extra_identifiers) + ) + + self.dag.provides(self._current_recipe, resource) + + def after(self, other_recipe: "Recipe"): + assert self._current_recipe is not None + self.dag.add_ordering(other_recipe, self._current_recipe) + + def before(self, other_recipe: "Recipe"): + assert self._current_recipe is not None + self.dag.add_ordering(self._current_recipe, other_recipe) + + def subrecipe(self, sub: "Recipe"): + self.dag.add(sub) + self._queue.append((sub, self.dag.recipe_meta[sub])) + + def prepare_all(self) -> None: + for recipe in self.dag.vertices: + if not isinstance(recipe, Recipe): + continue + meta = self.dag.recipe_meta[recipe] + if meta.state != RecipeState.LOADED: + continue + self._queue.append((recipe, meta)) + + while self._queue: + recipe, meta = self._queue.popleft() + self._current_recipe = recipe + recipe.prepare(self, self.head) + self._current_recipe = None + meta.state = RecipeState.PREPARED + + class Kitchen: def __init__( - self, - head: Head, - dependency_store: DependencyCache, - notifying_provides: Dict[Recipe, List[DependencySpec]], + self, head: "Head", dependency_store: DependencyCache, ): self._chanproheads: Dict[Tuple[str, str], Future[ChanProHead]] = dict() self._dependency_store = dependency_store self._dependency_trackers: Dict[Recipe, DependencyTracker] = defaultdict( - lambda: DependencyTracker(head.pools) + lambda: DependencyTracker(DependencyBook(), head.dag) ) self.head = head - self._notifying_provides = notifying_provides - self.notifications: Dict[DependencySpec, bool] = dict() + self.last_updated_ats: Dict[Resource, int] = dict() + self._cookable: Queue[Optional[Vertex]] = Queue() + self._sleeper_slots: int = 0 def get_dependency_tracker(self): return self._dependency_trackers[current_recipe.get()] @@ -61,7 +135,7 @@ class Kitchen: None, user, connection_details["souscmd"], - connection_details.get("dangerous_debug_logging", False) + connection_details.get("dangerous_debug_logging", False), ) except Exception: logger.error("Failed to open SSH connection", exc_info=True) @@ -75,32 +149,121 @@ class Kitchen: return await self._chanproheads[hostuser] - async def run_epoch( - self, - epoch: List[DepEle], - depchecks: Dict[Recipe, DepCheckOutcome], - concurrency_limit_per_host: int = 5, - ): - per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: []) + async def cook_all(self): + # TODO fridge emitter - # sort into per-host lists - for recipe in epoch: - if isinstance(recipe, Recipe): - if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP: - per_host_lists[recipe.get_host()].append(recipe) + num_workers = 8 - coros: List[Coroutine] = [] + self._sleeper_slots = num_workers - 1 - for host, recipes in per_host_lists.items(): - host_work_pool = HostWorkPool(recipes, depchecks) - coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host)) + for vertex in self.head.dag.vertices: + if isinstance(vertex, Recipe): + rec_meta = self.head.dag.recipe_meta[vertex] + if rec_meta.incoming_uncompleted == 0: + rec_meta.state = RecipeState.COOKABLE + self._cookable.put_nowait(vertex) + else: + rec_meta.state = RecipeState.PENDING + elif isinstance(vertex, Resource): + res_meta = self.head.dag.resource_meta[vertex] + if res_meta.incoming_uncompleted == 0: + res_meta.completed = True + if res_meta.hard_need: + needers = self.head.dag.edges[vertex] + needers_str = "".join(f" - {n}\n" for n in needers) + raise RuntimeError( + f"Hard need 「{vertex}」 not satisfiable." + f" Needed by:\n{needers_str}" + ) + self._cookable.put_nowait(vertex) - await asyncio.gather(*coros, return_exceptions=False) + workers = [] + for _ in range(num_workers): + workers.append(self._cooking_worker()) + + await asyncio.gather(*workers, return_exceptions=False) + + async def _cooking_worker(self): + dag = self.head.dag + while True: + if self._sleeper_slots <= 0 and self._cookable.empty(): + self._sleeper_slots -= 1 + self._cookable.put_nowait(None) + break + + self._sleeper_slots -= 1 + try: + next_job = await self._cookable.get() + finally: + self._sleeper_slots += 1 + + if next_job is None: + continue + + if isinstance(next_job, Recipe): + meta = dag.recipe_meta[next_job] + + # TODO try to deduplicate + meta.state = RecipeState.BEING_COOKED + current_recipe.set(next_job) + eprint(f"cooking {next_job}") + await next_job.cook(self) + eprint(f"cooked {next_job}") + # TODO cook + # TODO store depbook + await self._store_dependency(next_job) + meta.state = RecipeState.COOKED + elif isinstance(next_job, Resource): + eprint(f"have {next_job}") + pass + + for edge in dag.edges[next_job]: + logger.debug("updating edge: %s → %s", next_job, edge) + if isinstance(edge, Recipe): + rec_meta = dag.recipe_meta[edge] + rec_meta.incoming_uncompleted -= 1 + logger.debug("has %d incoming", rec_meta.incoming_uncompleted) + if ( + rec_meta.incoming_uncompleted == 0 + and rec_meta.state == RecipeState.PENDING + ): + rec_meta.state = RecipeState.COOKABLE + self._cookable.put_nowait(edge) + elif isinstance(edge, Resource): + res_meta = dag.resource_meta[edge] + res_meta.incoming_uncompleted -= 1 + logger.debug("has %d incoming", res_meta.incoming_uncompleted) + if res_meta.incoming_uncompleted == 0 and not res_meta.completed: + res_meta.completed = True + self._cookable.put_nowait(edge) + + # async def run_epoch( + # self, + # epoch: List[DepEle], + # depchecks: Dict[Recipe, DepCheckOutcome], + # concurrency_limit_per_host: int = 5, + # ): + # per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: []) + # + # # sort into per-host lists + # for recipe in epoch: + # if isinstance(recipe, Recipe): + # if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP: + # per_host_lists[recipe.get_host()].append(recipe) + # + # coros: List[Coroutine] = [] + # + # for host, recipes in per_host_lists.items(): + # host_work_pool = HostWorkPool(recipes, depchecks) + # coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host)) + # + # await asyncio.gather(*coros, return_exceptions=False) async def start(self, utensil: Utensil) -> Channel: utensil_name = utensil_namer(utensil.__class__) recipe = current_recipe.get() - cph = await self.get_chanprohead(recipe.get_host(), recipe.get_user(self.head)) + context = recipe.recipe_context + cph = await self.get_chanprohead(context.sous, context.user) # noinspection PyDataclass payload = cattr.unstructure(utensil) @@ -143,50 +306,51 @@ class Kitchen: dependency_tracker = self._dependency_trackers.pop(recipe, None) if not dependency_tracker: raise KeyError(f"Recipe {recipe} has not been tracked.") - depbook = dependency_tracker.make_depbook() + depbook = dependency_tracker.book if depbook: await self._dependency_store.register(recipe, depbook) -@attr.s(auto_attribs=True) -class HostWorkPool: - jobs: List[Recipe] - depchecks: Dict[Recipe, DepCheckOutcome] - next_job: int = 0 - - async def cook_all(self, kitchen: Kitchen, concurrency_limit: int): - num_jobs = len(self.jobs) - concurrency_limit = min(num_jobs, concurrency_limit) - - async def cooker(): - while self.next_job < num_jobs: - recipe = self.jobs[self.next_job] - self.next_job += 1 - - current_recipe.set(recipe) - depcheck = self.depchecks.get(recipe) - if ( - depcheck is not None - and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC - ): - book = depcheck.book - assert book is not None - can_skip = await kitchen.ut1( - CanSkipDynamic(book.dyn_sous_file_hashes) - ) - if can_skip: - continue - - await recipe.cook(kitchen) - # if successful, store dependencies - await kitchen._store_dependency(recipe) - nps = kitchen._notifying_provides.get(recipe, None) - if nps: - for depspec in nps: - if depspec not in kitchen.notifications: - # default to changed if not told otherwise - kitchen.notifications[depspec] = True - - await asyncio.gather( - *[asyncio.create_task(cooker()) for _ in range(concurrency_limit)] - ) +# +# @attr.s(auto_attribs=True) +# class HostWorkPool: +# jobs: List[Recipe] +# depchecks: Dict[Recipe, DepCheckOutcome] +# next_job: int = 0 +# +# async def cook_all(self, kitchen: Kitchen, concurrency_limit: int): +# num_jobs = len(self.jobs) +# concurrency_limit = min(num_jobs, concurrency_limit) +# +# async def cooker(): +# while self.next_job < num_jobs: +# recipe = self.jobs[self.next_job] +# self.next_job += 1 +# +# current_recipe.set(recipe) +# depcheck = self.depchecks.get(recipe) +# if ( +# depcheck is not None +# and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC +# ): +# book = depcheck.book +# assert book is not None +# can_skip = await kitchen.ut1( +# CanSkipDynamic(book.dyn_sous_file_hashes) +# ) +# if can_skip: +# continue +# +# await recipe.cook(kitchen) +# # if successful, store dependencies +# await kitchen._store_dependency(recipe) +# nps = kitchen._notifying_provides.get(recipe, None) +# if nps: +# for depspec in nps: +# if depspec not in kitchen.notifications: +# # default to changed if not told otherwise +# kitchen.notifications[depspec] = True +# +# await asyncio.gather( +# *[asyncio.create_task(cooker()) for _ in range(concurrency_limit)] +# ) diff --git a/scone/head/menu_reader.py b/scone/head/menu_reader.py index 604fbd0..c038e5e 100644 --- a/scone/head/menu_reader.py +++ b/scone/head/menu_reader.py @@ -1,125 +1,469 @@ +import logging import os -from os import path +import typing +from collections import defaultdict from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union -import toml +import attr +import textx + +from scone.head.dag import RecipeDag, Resource +from scone.head.recipe import RecipeContext + +if typing.TYPE_CHECKING: + from scone.head.head import Head + from scone.head.recipe import Recipe + from scone.head.variables import Variables -class Menu: - def __init__(self): - self.hostmenus = {} - - def get_host(self, name: str): - if name in self.hostmenus: - return self.hostmenus[name] - else: - new = HostMenu() - self.hostmenus[name] = new - return new - - def __str__(self): - lines = ["Menu"] - - for hostspec, hostmenu in self.hostmenus.items(): - lines.append(f" on {hostspec} :-") - lines += [" " + line for line in str(hostmenu).split("\n")] - lines.append("") - - return "\n".join(lines) +def _load_grammar(): + grammar_file_path = Path(Path(__file__).parent, "grammar", "scoml.tx") + return textx.metamodel_from_file(grammar_file_path) -class HostMenu: - def __init__(self): - self.dishes = {} +scoml_grammar = _load_grammar() +scoml_classes = scoml_grammar.namespaces["scoml"] - def __str__(self): - lines = ["Menu"] - - for recipe, dishes in self.dishes.items(): - lines.append(f"- recipe {recipe}") - lines += [f" - {slug} {args}" for slug, args in dishes.items()] - lines.append("") - - return "\n".join(lines) +logger = logging.getLogger(__name__) -def parse_toml_menu_descriptor( - filename: str, menu: Menu, default_hostspec: str, source_name: str = None -) -> None: - source_name = source_name or filename +@attr.s(auto_attribs=True) +class ForDirective: + """ + For loop_variable in collection + """ - with open(filename, "r") as f: - menu_desc: Dict[str, Any] = toml.load(f) # type: ignore + # The name of the variable that should be taken on by the iteration + loop_variable: str - if "-----" in menu_desc: - magic_tweaks = menu_desc["-----"] - del menu_desc["-----"] + # List of literals or str for a variable (by name) + collection: Union[str, List[Any]] + + +@attr.s(auto_attribs=True) +class RecipeEdgeDirective: + # "after" or "before" + kind: str + + recipe_id: str + + +@attr.s(auto_attribs=True) +class ResourceEdgeDirective: + # "needs", "wants" or "provides" + kind: str + + resource: Resource + + +@attr.s(auto_attribs=True) +class MenuBlock: + id: Optional[None] + + human: str + + contents: List[Union["MenuBlock", "MenuRecipe"]] + + parent: Optional["MenuBlock"] + + user_directive: Optional[str] = None + sous_directive: Optional[str] = None + for_directives: List[ForDirective] = [] + import_directives: List[str] = [] + recipe_edges: List[RecipeEdgeDirective] = [] + resource_edges: List[ResourceEdgeDirective] = [] + + +@attr.s(auto_attribs=True, eq=False) +class MenuRecipe: + kind: str + + id: Optional[str] + + human: str + + arguments: Dict[str, Any] + + parent: MenuBlock + + user_directive: Optional[str] = None + sous_directive: Optional[str] = None + for_directives: List[ForDirective] = [] + recipe_edges: List[RecipeEdgeDirective] = [] + resource_edges: List[ResourceEdgeDirective] = [] + + +def convert_textx_value(txvalue) -> Any: + if isinstance(txvalue, scoml_classes["NaturalList"]): + return [convert_textx_value(element) for element in txvalue.elements] + elif ( + isinstance(txvalue, scoml_classes["QuotedString"]) + or isinstance(txvalue, scoml_classes["UnquotedString"]) + or isinstance(txvalue, scoml_classes["Integer"]) + or isinstance(txvalue, scoml_classes["Boolean"]) + ): + return txvalue.value + elif isinstance(txvalue, scoml_classes["BracketList"]): + return [convert_textx_value(item) for item in txvalue.items] + elif isinstance(txvalue, scoml_classes["BraceDict"]): + result = dict() + for pair in txvalue.pairs: + result[convert_textx_value(pair.key)] = convert_textx_value(pair.value) else: - magic_tweaks = {} + raise ValueError(f"Unknown SCOML value: {txvalue}") - for key, dishes in menu_desc.items(): - # print(key, "=", dishes) - key_parts = key.split("--") - lkp = len(key_parts) - if lkp == 1: - # pg-db.synapse - hostspec = default_hostspec - recipe = key_parts[0] - elif lkp == 2: - if key_parts[1] == "": - # fridge-copy-- - hostspec = default_hostspec - recipe = key_parts[0] + +def convert_textx_recipe(txrecipe_or_subblock, parent: Optional[MenuBlock]): + if isinstance(txrecipe_or_subblock, scoml_classes["SubBlock"]): + txsubblock = txrecipe_or_subblock + menu_block = convert_textx_block(txsubblock.block, parent) + menu_block.id = txsubblock.unique_id + menu_block.human = txsubblock.human.strip() + + return menu_block + elif isinstance(txrecipe_or_subblock, scoml_classes["Recipe"]): + assert parent is not None + txrecipe = txrecipe_or_subblock + args = dict() + + for arg in txrecipe.args: + args[arg.name] = convert_textx_value(arg.value) + recipe = MenuRecipe( + txrecipe.kind, txrecipe.unique_id, txrecipe.human.strip(), args, parent + ) + + for directive in txrecipe.directives: + if isinstance(directive, scoml_classes["UserDirective"]): + recipe.user_directive = directive.user + elif isinstance(directive, scoml_classes["SousDirective"]): + recipe.user_directive = directive.sous else: - # server1--pg-db.synapse - hostspec = key_parts[0] - recipe = key_parts[1] - elif lkp == 3 and key_parts[2] == "": - # server2--fridge-copy-- - hostspec = key_parts[0] - recipe = key_parts[1] + raise ValueError(f"Unknown directive {directive}") + + return recipe + else: + raise ValueError("Neither Recipe nor SubBlock: " + str(txrecipe_or_subblock)) + + +def convert_textx_resource(txresource) -> Resource: + extra_params = None + if txresource.extra_params is not None: + extra_params = convert_textx_value(txresource.extra_params) + + sous: Optional[str] = "(self)" # XXX docstring to warn about this + if txresource.sous: + if txresource.sous == "head": + sous = None else: - raise ValueError(f"Don't understand key: {key}") + sous = txresource.sous - hostmenu = menu.get_host(hostspec) - if recipe in hostmenu.dishes: - mdishes = hostmenu.dishes[recipe] + return Resource(txresource.type, txresource.primary, sous, extra_params) + + +def convert_textx_block(txblock, parent: Optional[MenuBlock]) -> MenuBlock: + recipes: List[Union[MenuRecipe, MenuBlock]] = [] + block = MenuBlock(None, "", recipes, parent) + + for recipe in txblock.recipes: + recipes.append(convert_textx_recipe(recipe, block)) + + for directive in txblock.directives: + if isinstance(directive, scoml_classes["UserDirective"]): + # TODO(expectation): error if multiple user directives + block.user_directive = directive.user + elif isinstance(directive, scoml_classes["SousDirective"]): + block.sous_directive = directive.sous + elif isinstance(directive, scoml_classes["ForDirective"]): + block.for_directives.append( + ForDirective( + directive.loop_variable, + directive.collection or convert_textx_value(directive.list), + ) + ) + elif isinstance(directive, scoml_classes["ImportDirective"]): + block.import_directives.append(directive.importee) + elif isinstance(directive, scoml_classes["ResourceEdgeDirective"]): + block.resource_edges.append( + ResourceEdgeDirective( + directive.kind, convert_textx_resource(directive.resource) + ) + ) + elif isinstance(directive, scoml_classes["RecipeEdgeDirective"]): + block.recipe_edges.append(RecipeEdgeDirective(directive.kind, directive.id)) else: - mdishes = {} - hostmenu.dishes[recipe] = mdishes + raise ValueError(f"Unknown directive {directive}") - if isinstance(dishes, dict): - for slug, args in dishes.items(): - if slug in mdishes: - raise ValueError( - f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}" - ) - mdishes[slug] = args - args[".source"] = (source_name, key, slug) - args[".m"] = magic_tweaks - elif isinstance(dishes, list): - for idx, args in enumerate(dishes): - slug = f"@{source_name}@{idx}" - if slug in mdishes: - raise ValueError( - f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}" - ) - mdishes[slug] = args - args[".source"] = (source_name, key, idx) - args[".m"] = magic_tweaks + return block -def parse_toml_menu_descriptors(menu_dir: str) -> Menu: - menu = Menu() - for root, dirs, files in os.walk(menu_dir): - for file in files: - full_path = path.join(root, file) - if file.endswith(".toml"): +SousName = str +ForLoopIndices = Tuple[int, ...] +SingleRecipeInvocationKey = Tuple[SousName, ForLoopIndices] + + +class MenuLoader: + def __init__(self, menu_dir: Path, head: "Head"): + self._menu_dir: Path = menu_dir + self._units: Dict[str, MenuBlock] = dict() + self._recipes: Dict[ + MenuRecipe, Dict[SingleRecipeInvocationKey, Recipe] + ] = defaultdict(dict) + self._dag: RecipeDag = head.dag + self._head = head + + @staticmethod + def _load_menu_unit(full_path: Path, relative: str) -> MenuBlock: + model = scoml_grammar.model_from_file(full_path) + return convert_textx_block(model, None) + + def load(self, unit_name: str): + if unit_name in self._units: + return + + full_path = Path(self._menu_dir, unit_name + ".scoml") + menu_block = self._load_menu_unit(full_path, unit_name) + self._units[unit_name] = menu_block + for unit in menu_block.import_directives: + self.load(unit) + + def resolve_ref( + self, referrer: Union[MenuBlock, MenuRecipe], reference: str + ) -> Optional[Union[MenuBlock, MenuRecipe]]: + """ + Resolves a recipe or block reference + :param referrer: recipe or block making the reference that needs to be resolved + :param reference: string reference that needs to be resolved + :return: If found, the menu block or recipe that was referenced. + """ + # TODO(feature): need to think about scoping rules and then figure + # this one out + return None + + def _get_first_common_ancestor( + self, one: Union[MenuBlock, MenuRecipe], other: Union[MenuBlock, MenuRecipe] + ) -> Optional[MenuBlock]: + ancestors_of_a = set() + + a: Optional[Union[MenuBlock, MenuRecipe]] = one + b: Optional[Union[MenuBlock, MenuRecipe]] = other + + while a: + ancestors_of_a.add(a) + a = a.parent + + while b: + if b in ancestors_of_a: + assert isinstance(b, MenuBlock) + return b + b = b.parent + + return None + + def get_related_instances( + self, + sous: str, + referrer_indices: Tuple[int, ...], + referrer: Union[MenuBlock, MenuRecipe], + menu_recipe: MenuRecipe, + ) -> List["Recipe"]: + result = [] + + first_common_ancestor = self._get_first_common_ancestor(referrer, menu_recipe) + + a: Union[MenuBlock, MenuRecipe] = referrer + strip = 0 + while a != first_common_ancestor: + strip += len(a.for_directives) + parent = a.parent + assert parent is not None + a = parent + + a = menu_recipe + extra = 0 + while a != first_common_ancestor: + extra += len(a.for_directives) + parent = a.parent + assert parent is not None + a = parent + + for (instance_sous, indices), recipe in self._recipes[menu_recipe].items(): + if sous != instance_sous: + continue + if len(referrer_indices) - strip + extra == len(indices): + if referrer_indices[:-strip] == indices[:-extra]: + result.append(recipe) + else: + logger.warning( + "Mismatch in indices length %r - %d + %d ~/~ %r", + referrer_indices, + strip, + extra, + indices, + ) + + return result + + def dagify_recipe( + self, + recipe: MenuRecipe, + hierarchical_source: str, + fors: Tuple[ForDirective, ...], + applicable_souss: Iterable[str], + applicable_user: Optional[str], + ): + recipe_class = self._head.recipe_loader.get_class(recipe.kind) + + fors = fors + tuple(recipe.for_directives) + + if recipe.user_directive: + applicable_user = recipe.user_directive + + if recipe.sous_directive: + applicable_souss = self._head.get_souss_for_hostspec(recipe.sous_directive) + + for sous in applicable_souss: + if not applicable_user: + applicable_user = self._head.souss[sous]["user"] + assert applicable_user is not None + + sous_vars = self._head.variables[sous] + for _vars, for_indices in self._for_apply(fors, sous_vars, tuple()): + context = RecipeContext( + sous=sous, + user=applicable_user, + slug=recipe.id, + hierarchical_source=hierarchical_source, # XXX + human=recipe.human, + ) + args = recipe.arguments # noqa + # XXX sub in vars + instance: Recipe = recipe_class.new( + context, recipe.arguments, self._head + ) + self._recipes[recipe][(sous, for_indices)] = instance + self._dag.add(instance) + + def dagify_block( + self, + block: MenuBlock, + hierarchical_source: str, + fors: Tuple[ForDirective, ...], + applicable_souss: Iterable[str], + applicable_user: Optional[str], + ): + fors = fors + tuple(block.for_directives) + + if block.user_directive: + applicable_user = block.user_directive + + if block.sous_directive: + applicable_souss = self._head.get_souss_for_hostspec(block.sous_directive) + + for content in block.contents: + if isinstance(content, MenuBlock): + block_name = content.id or "?" + self.dagify_block( + content, + f"{hierarchical_source}.{block_name}", + fors, + applicable_souss, + applicable_user, + ) + elif isinstance(content, MenuRecipe): + self.dagify_recipe( + content, + hierarchical_source, + fors, + applicable_souss, + applicable_user, + ) + else: + raise ValueError(f"{content}?") + + def postdagify_recipe( + self, + recipe: MenuRecipe, + fors: Tuple[ForDirective, ...], + applicable_souss: Iterable[str], + ): + # add fors + fors = fors + tuple(recipe.for_directives) + + if recipe.sous_directive: + applicable_souss = self._head.get_souss_for_hostspec(recipe.sous_directive) + + for sous in applicable_souss: + sous_vars = self._head.variables[sous] + for _vars, for_indices in self._for_apply(fors, sous_vars, tuple()): + instance = self._recipes[recipe][(sous, for_indices)] # noqa + + # XXX apply specific edges here including those from parent + + def postdagify_block( + self, + block: MenuBlock, + fors: Tuple[ForDirective, ...], + applicable_souss: Iterable[str], + ): + # XXX pass down specific edges here + + fors = fors + tuple(block.for_directives) + + if block.sous_directive: + applicable_souss = self._head.get_souss_for_hostspec(block.sous_directive) + + for content in block.contents: + if isinstance(content, MenuBlock): + self.postdagify_block(content, fors, applicable_souss) + elif isinstance(content, MenuRecipe): + self.postdagify_recipe(content, fors, applicable_souss) + else: + raise ValueError(f"{content}?") + + def dagify_all(self): + for name, unit in self._units.items(): + self.dagify_block( + unit, name, tuple(), self._head.get_souss_for_hostspec("all"), None + ) + for _name, unit in self._units.items(): + self.postdagify_block( + unit, tuple(), self._head.get_souss_for_hostspec("all") + ) + + def _for_apply( + self, fors: Tuple[ForDirective, ...], vars: "Variables", accum: Tuple[int, ...] + ) -> Iterable[Tuple["Variables", Tuple[int, ...]]]: + if not fors: + yield vars, accum + return + + head = fors[0] + tail = fors[1:] + + to_iter = head.collection + if isinstance(to_iter, str): + to_iter = vars.get_dotted(to_iter) + + if not isinstance(to_iter, list): + raise ValueError(f"to_iter = {to_iter!r} not a list") + + for idx, item in enumerate(to_iter): + new_vars = Variables(vars) + new_vars.set_dotted(head.loop_variable, item) + yield from self._for_apply(tail, new_vars, accum + (idx,)) + + def load_menus_in_dir(self) -> RecipeDag: + dag = RecipeDag() + + for root, dirs, files in os.walk(self._menu_dir): + for file in files: + if not file.endswith(".scoml"): + continue + # full_path = Path(root, file) # load this as a menu file pieces = file.split(".") - default_hostspec = pieces[-2] - relative = str(Path(full_path).relative_to(menu_dir)) - parse_toml_menu_descriptor(full_path, menu, default_hostspec, relative) + assert len(pieces) == 2 + self.load(pieces[0]) - return menu + return dag diff --git a/scone/head/recipe.py b/scone/head/recipe.py index 66ba443..ede8f70 100644 --- a/scone/head/recipe.py +++ b/scone/head/recipe.py @@ -1,100 +1,11 @@ import typing -from collections import defaultdict -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, Optional -import toposort +import attr if typing.TYPE_CHECKING: - from scone.head import Head - from scone.head.kitchen import Kitchen - -DependencySpec = Tuple[str, str, frozenset] -DepEle = Union["Recipe", DependencySpec] - - -class Preparation: - """ - Preparation on a single host. - This is done before we start deduplicating (could this be improved?). - """ - - def __init__(self, recipes: List["Recipe"]): - self._to_process = recipes.copy() - self.recipes = recipes - self._dependencies: Dict[DepEle, Set[DepEle]] = {} - self._recipe_now: Optional[Recipe] = None - self._hard_needs: Set[DependencySpec] = set() - self.notifying_provides: Dict[Recipe, List[DependencySpec]] = defaultdict( - lambda: [] - ) - - def _make_depspec_tuple( - self, requirement: str, identifier: str, **extra_identifiers: Any - ) -> DependencySpec: - if "host" not in extra_identifiers: - assert self._recipe_now is not None - extra_identifiers["host"] = self._recipe_now.get_host() - return requirement, identifier, frozenset(extra_identifiers.items()) - - def needs( - self, - requirement: str, - identifier: str, - hard: bool = False, - **extra_identifiers: Any, - ) -> None: - assert self._recipe_now is not None - if self._recipe_now not in self._dependencies: - self._dependencies[self._recipe_now] = set() - depspec_tuple = self._make_depspec_tuple( - requirement, identifier, **extra_identifiers - ) - self._dependencies[self._recipe_now].add(depspec_tuple) - if hard: - self._hard_needs.add(depspec_tuple) - - def provides( - self, - requirement: str, - identifier: str, - notifying: bool = False, - **extra_identifiers: Any, - ) -> None: - assert self._recipe_now is not None - depspec_tuple = self._make_depspec_tuple( - requirement, identifier, **extra_identifiers - ) - if depspec_tuple not in self._dependencies: - self._dependencies[depspec_tuple] = set() - self._dependencies[depspec_tuple].add(self._recipe_now) - if notifying: - self.notifying_provides[self._recipe_now].append(depspec_tuple) - - def subrecipe(self, recipe: "Recipe"): - assert self._recipe_now is not None - self._to_process.append(recipe) - self.recipes.append(recipe) - args = getattr(recipe, "_args") - if ".source" not in args: - file, key, slug = getattr(self._recipe_now, "_args")[".source"] - args[".source"] = (file, key + "-sub", slug) - - def prepare(self, head: "Head") -> List[List]: - while self._to_process: - next_recipe = self._to_process.pop() - self._recipe_now = next_recipe - next_recipe.prepare(self, head) - - for hard_need in self._hard_needs: - if hard_need not in self._dependencies: - raise ValueError(f"Hard need not satisfied (no entry): {hard_need}") - if not self._dependencies[hard_need]: - raise ValueError(f"Hard need not satisfied (empty): {hard_need}") - - self._dependencies[self._make_depspec_tuple(".internal", "completed")] = set( - self.recipes - ) - return list(toposort.toposort(self._dependencies)) + from scone.head.head import Head + from scone.head.kitchen import Kitchen, Preparation def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]: @@ -103,60 +14,36 @@ def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]: return None +@attr.s(auto_attribs=True) +class RecipeContext: + sous: str + + user: str + + slug: Optional[str] + + hierarchical_source: Optional[str] + + human: str + + class Recipe: - def __init__(self, host: str, slug: str, args: dict, head: "Head"): - self._host = host - self._slug = slug - self._args = args - - def get_host(self): - return self._host - - def get_tweak(self, name: str, default: Any) -> Any: - dotname = f".{name}" - if dotname in self._args: - return self._args[dotname] - elif ".m" in self._args and name in self._args[".m"]: - return self._args[".m"][name] - else: - return default - - def get_user(self, head: "Head") -> str: - user = self.get_tweak("user", head.souss[self._host]["user"]) - assert isinstance(user, str) - return user + def __init__( + self, recipe_context: RecipeContext, args: Dict[str, Any], head: "Head" + ): + self.recipe_context = recipe_context + self.arguments = args @classmethod - def from_menu(cls, host: str, slug: str, args: dict, head: "Head") -> "Recipe": - return cls(host, slug, args, head) + def new(cls, recipe_context: RecipeContext, args: Dict[str, Any], head: "Head"): + return cls(recipe_context, args, head) - def prepare(self, preparation: Preparation, head: "Head") -> None: - preparation.needs("os-user", self.get_user(head)) + def prepare(self, preparation: "Preparation", head: "Head") -> None: + preparation.needs("os-user", self.recipe_context.user) # TODO(feature) allow merging per-task and per-menu tweaks # TODO(feature) allow need/provide custom things, not just user-units - afters = self.get_tweak("needs", None) - if afters: - for after in afters: - if isinstance(after, list) and len(after) == 2: - # allow requesting custom needs - preparation.needs(after[0], after[1]) - continue - if not isinstance(after, str): - raise ValueError("needs tweak should be list of strings or pairs.") - preparation.needs("user-unit", after) - - befores = self.get_tweak("provides", None) - if befores: - if isinstance(befores, str): - preparation.provides("user-unit", befores) - else: - for before in befores: - if not isinstance(before, str): - raise ValueError("provides tweak should be list of strings.") - preparation.provides("user-unit", before) - async def cook(self, kitchen: "Kitchen") -> None: raise NotImplementedError @@ -164,8 +51,12 @@ class Recipe: cls = self.__class__ if hasattr(cls, "RECIPE_NAME"): return ( - f"{cls.RECIPE_NAME}({cls.__name__}) {self._slug} " # type: ignore - f"on {self._host} ({self._args})" + f"{cls.RECIPE_NAME}({cls.__name__})" # type: ignore + f" {self.recipe_context.human} " + f"on {self.recipe_context.sous} ({self.arguments})" ) else: - return f"{cls.__name__} {self._slug} on {self._host} ({self._args})" + return ( + f"{cls.__name__} {self.recipe_context.human}" + f" on {self.recipe_context.sous} ({self.arguments})" + ) diff --git a/scone/head/sshconn.py b/scone/head/sshconn.py index 2a73fff..d4b7266 100644 --- a/scone/head/sshconn.py +++ b/scone/head/sshconn.py @@ -27,7 +27,7 @@ async def open_ssh_sous( client_key: Optional[str], requested_user: str, sous_command: str, - debug_logging: bool = False + debug_logging: bool = False, ) -> Tuple[ChanPro, Channel]: if client_key: opts = SSHClientConnectionOptions(username=user, client_keys=[client_key]) @@ -42,8 +42,11 @@ async def open_ssh_sous( command = sous_command if debug_logging: - command = f"tee /tmp/sconnyin-{requested_user} | {command} 2>/tmp/sconnyerr-{requested_user} " \ - f"| tee /tmp/sconnyout-{requested_user}" + command = ( + f"tee /tmp/sconnyin-{requested_user} " + f"| {command} 2>/tmp/sconnyerr-{requested_user} " + f"| tee /tmp/sconnyout-{requested_user}" + ) process: SSHClientProcess = await conn.create_process(command, encoding=None) diff --git a/scone/head/variables.py b/scone/head/variables.py index f291303..0bc779e 100644 --- a/scone/head/variables.py +++ b/scone/head/variables.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Dict, List, NamedTuple +from typing import Any, Dict, List, NamedTuple, Optional ExpressionPart = NamedTuple("ExpressionPart", [("kind", str), ("value", str)]) @@ -84,8 +84,9 @@ def merge_right_into_left_inplace(left: dict, right: dict): class Variables: - def __init__(self): + def __init__(self, delegate: Optional["Variables"]): self._vars: Dict[str, Any] = {} + self._delegate: Optional[Variables] = delegate def get_dotted(self, name: str) -> Any: current = self._vars @@ -95,6 +96,8 @@ class Variables: current = current[k] return current except KeyError: + if self._delegate: + return self._delegate.get_dotted(name) raise KeyError("No variable: " + name) def has_dotted(self, name: str) -> bool: @@ -102,6 +105,8 @@ class Variables: self.get_dotted(name) return True except KeyError: + if self._delegate: + return self._delegate.has_dotted(name) return False def set_dotted(self, name: str, value: Any): diff --git a/setup.py b/setup.py index 0c0423b..08d0c90 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,8 @@ REQUIRED = [ "toml~=0.10.1", "attrs~=19.3.0", "cattrs~=1.0.0", - "canonicaljson~=1.2.0" + "canonicaljson~=1.2.0", + "immutabledict==1.0.0" ]