New DAG execution model, works! But no dedupe yet

This commit is contained in:
Olivier 'reivilibre' 2020-10-07 21:11:48 +01:00
parent 328c99924c
commit 58af07bbb2
25 changed files with 1687 additions and 915 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
/.idea
__pycache__
/scone.egg-info
/dist

View File

@ -21,3 +21,9 @@ ignore_missing_imports = True
[mypy-asyncpg]
ignore_missing_imports = True
[mypy-frozendict]
ignore_missing_imports = True
[mypy-textx]
ignore_missing_imports = True

View File

@ -3,7 +3,9 @@
# import sys
# from typing import List
#
# from scone.head import Head, Recipe
# from scone.head.head import Head
# from scone.head.recipe import Recipe
# from scone.head.kitchen import Kitchen
# from scone.head.recipe import Preparation

View File

@ -1,9 +1,9 @@
from typing import Dict, List, Set, Tuple
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
@ -17,8 +17,8 @@ class AptInstallInternal(Recipe):
# TODO(extension, low): expand this into apt-install-now if we need
# the flexibility
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.packages: Set[str] = set()
@ -66,23 +66,37 @@ class AptPackage(Recipe):
internal_installers: Dict[Tuple[Head, str], AptInstallInternal] = {}
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.packages: List[str] = check_type(args["packages"], list)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
pair = (head, self.get_host())
if pair not in AptPackage.internal_installers:
install_internal = AptInstallInternal(self.get_host(), "internal", {}, head)
AptPackage.internal_installers[pair] = install_internal
preparation.subrecipe(install_internal)
preparation.provides("apt-stage", "internal-install-packages")
internal_installer = AptPackage.internal_installers.get(pair)
assert internal_installer is not None
internal_installer.packages.update(self.packages)
for package in self.packages:
preparation.provides("apt-package", package)
async def cook(self, kitchen: Kitchen) -> None:
# can't be tracked
kitchen.get_dependency_tracker().ignore()
# this is a one-off task assuming everything works
kitchen.get_dependency_tracker()
if self.packages:
update = await kitchen.ut1areq(
SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result
)
if update.exit_code != 0:
raise RuntimeError(
f"apt update failed with err {update.exit_code}: {update.stderr!r}"
)
install_args = ["apt-get", "-yq", "install"]
install_args += list(self.packages)
install = await kitchen.ut1areq(
SimpleExec(install_args, "/"), SimpleExec.Result
)
if install.exit_code != 0:
raise RuntimeError(
f"apt install failed with err {install.exit_code}:"
f" {install.stderr!r}"
)

View File

@ -12,9 +12,9 @@ from scone.default.utensils.basic_utensils import (
Stat,
)
from scone.default.utensils.dynamic_dependencies import HasChangedInSousStore
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type, check_type_opt
@ -28,7 +28,7 @@ class DeclareFile(Recipe):
_NAME = "declare-file"
def prepare(self, preparation: Preparation, head: Head):
preparation.provides("file", self._args["path"])
preparation.provides("file", self.arguments["path"])
async def cook(self, kitchen: Kitchen):
# mark as tracked.
@ -45,7 +45,7 @@ class DeclareDirectory(Recipe):
_NAME = "declare-dir"
def prepare(self, preparation: Preparation, head: Head):
preparation.provides("directory", self._args["path"])
preparation.provides("directory", self.arguments["path"])
async def cook(self, kitchen: Kitchen):
# mark as tracked.
@ -59,8 +59,8 @@ class EnsureDirectory(Recipe):
_NAME = "directory"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
parents = args.get("parents", 0)
assert isinstance(parents, int)
@ -74,7 +74,7 @@ class EnsureDirectory(Recipe):
self.parents = parents
self.mode = parse_mode(mode, directory=True)
self._make: List[str] = []
self.targ_user = args.get("owner", self.get_user(head))
self.targ_user = args.get("owner", recipe_context.user)
self.targ_group = args.get("group", self.targ_user)
def prepare(self, preparation: Preparation, head: "Head"):
@ -123,8 +123,8 @@ class ExtractTar(Recipe):
_NAME = "tar-extract"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.tar = check_type(args.get("tar"), str)
self.dir = check_type(args.get("dir"), str)
@ -165,8 +165,8 @@ class RunScript(Recipe):
_NAME = "script-run"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.working_dir = check_type(args.get("working_dir"), str)
@ -196,8 +196,8 @@ class CommandOnChange(Recipe):
_NAME = "command-on-change"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.purpose = check_type(args.get("purpose"), str)
self.command = check_type(args.get("command"), list)
@ -232,8 +232,8 @@ class GitCheckout(Recipe):
# declare SAFE_TO_SKIP. Perhaps we want to stop that unless you opt out?
# But oh well for now.
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.repo_src = check_type(args.get("src"), str)
self.dest_dir = check_type(args.get("dest"), str)
@ -270,9 +270,7 @@ class GitCheckout(Recipe):
stat = await k.ut1a(Stat(self.dest_dir), Stat.Result)
if stat is None:
# doesn't exist; git init it
await exec_no_fails(
k, ["git", "init", self.dest_dir], "/"
)
await exec_no_fails(k, ["git", "init", self.dest_dir], "/")
stat = await k.ut1a(Stat(self.dest_dir), Stat.Result)
if stat is None:
@ -283,29 +281,30 @@ class GitCheckout(Recipe):
# add the remote, removing it first to ensure it's what we want
# don't care if removing fails
await k.ut1areq(SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir), SimpleExec.Result)
await k.ut1areq(
SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir),
SimpleExec.Result,
)
await exec_no_fails(
k, ["git", "remote", "add", "scone", self.repo_src], self.dest_dir
)
# fetch the latest from the remote
await exec_no_fails(
k, ["git", "fetch", "scone"], self.dest_dir
)
await exec_no_fails(k, ["git", "fetch", "scone"], self.dest_dir)
# figure out what ref we want to use
# TODO(performance): fetch only this ref?
ref = self.ref or f"scone/{self.branch}"
# switch to that ref
await exec_no_fails(
k, ["git", "switch", "--detach", ref], self.dest_dir
)
await exec_no_fails(k, ["git", "switch", "--detach", ref], self.dest_dir)
# if we use submodules
if self.submodules:
await exec_no_fails(
k, ["git", "submodule", "update", "--init", "--recursive"], self.dest_dir
k,
["git", "submodule", "update", "--init", "--recursive"],
self.dest_dir,
)
for expected in self.expect:
@ -313,10 +312,16 @@ class GitCheckout(Recipe):
# TODO(performance, low): parallelise these
stat = await k.ut1a(Stat(expected_path_str), Stat.Result)
if not stat:
raise RuntimeError(f"expected {expected_path_str} to exist but it did not")
raise RuntimeError(
f"expected {expected_path_str} to exist but it did not"
)
if stat.dir and not expected.endswith("/"):
raise RuntimeError(f"expected {expected_path_str} to exist as a file but it is a dir")
raise RuntimeError(
f"expected {expected_path_str} to exist as a file but it is a dir"
)
if not stat.dir and expected.endswith("/"):
raise RuntimeError(f"expected {expected_path_str} to exist as a dir but it is a file")
raise RuntimeError(
f"expected {expected_path_str} to exist as a dir but it is a file"
)

View File

@ -8,11 +8,15 @@ from urllib.request import urlretrieve
from scone.common.misc import sha256_file
from scone.common.modeutils import DEFAULT_MODE_FILE, parse_mode
from scone.default.steps import fridge_steps
from scone.default.steps.fridge_steps import FridgeMetadata, load_and_transform, SUPERMARKET_RELATIVE
from scone.default.utensils.basic_utensils import WriteFile, Chown
from scone.head import Head
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation, Recipe
from scone.default.steps.fridge_steps import (
SUPERMARKET_RELATIVE,
FridgeMetadata,
load_and_transform,
)
from scone.default.utensils.basic_utensils import Chown, WriteFile
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
@ -23,8 +27,9 @@ class FridgeCopy(Recipe):
_NAME = "fridge-copy"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head: Head):
super().__init__(recipe_context, args, head)
fp = fridge_steps.search_in_fridge(head, args["src"])
if fp is None:
raise ValueError(f"Cannot find {args['src']} in the fridge.")
@ -44,7 +49,7 @@ class FridgeCopy(Recipe):
mode = args.get("mode", DEFAULT_MODE_FILE)
assert isinstance(mode, str) or isinstance(mode, int)
self.fridge_path: str = args["src"]
self.fridge_path: str = check_type(args["src"], str)
self.real_path: Path = fp
self.fridge_meta: FridgeMetadata = meta
self.mode = parse_mode(mode, directory=False)
@ -56,7 +61,7 @@ class FridgeCopy(Recipe):
async def cook(self, k: Kitchen) -> None:
data = await load_and_transform(
k, self.fridge_meta, self.real_path, self.get_host()
k, self.fridge_meta, self.real_path, self.recipe_context.sous
)
dest_str = str(self.destination)
chan = await k.start(WriteFile(dest_str, self.mode))
@ -69,9 +74,7 @@ class FridgeCopy(Recipe):
# hash_of_data = sha256_bytes(data)
# k.get_dependency_tracker().register_remote_file(dest_str, hash_of_data)
await k.get_dependency_tracker().register_fridge_file(
self.fridge_path, self.real_path
)
k.get_dependency_tracker().register_fridge_file(self.fridge_path)
class Supermarket(Recipe):
@ -84,8 +87,8 @@ class Supermarket(Recipe):
# dict of target path → future that will complete when it's downloaded
in_progress: Dict[str, Future] = dict()
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.url = args.get("url")
assert isinstance(self.url, str)
@ -101,7 +104,7 @@ class Supermarket(Recipe):
else:
self.destination = Path(args["dest"]).resolve()
self.owner = check_type(args.get("owner", self.get_user(head)), str)
self.owner = check_type(args.get("owner", self.recipe_context.user), str)
self.group = check_type(args.get("group", self.owner), str)
mode = args.get("mode", DEFAULT_MODE_FILE)
@ -115,7 +118,9 @@ class Supermarket(Recipe):
async def cook(self, kitchen: "Kitchen"):
# need to ensure we download only once, even in a race…
supermarket_path = Path(kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256)
supermarket_path = Path(
kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256
)
if self.sha256 in Supermarket.in_progress:
await Supermarket.in_progress[self.sha256]
@ -136,7 +141,7 @@ Downloaded by {self}
self.url,
str(supermarket_path),
self.sha256,
note
note,
),
)

View File

@ -4,9 +4,9 @@ from typing import Optional
from scone.default.steps import linux_steps
from scone.default.utensils.linux_utensils import GetPasswdEntry
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type, check_type_opt
logger = logging.getLogger(__name__)
@ -15,12 +15,10 @@ logger = logging.getLogger(__name__)
class LinuxUser(Recipe):
_NAME = "os-user"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
if slug[0] == "@":
raise ValueError("os-user should be used like [os-user.username].")
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.user_name = slug
self.user_name = check_type(args.get("name"), str)
self.make_group = check_type(args.get("make_group", True), bool)
self.make_home = check_type(args.get("make_home", True), bool)
self.home: Optional[str] = check_type_opt(args.get("home"), str)
@ -62,3 +60,33 @@ class LinuxUser(Recipe):
self.make_group,
self.home,
)
class DeclareLinuxUser(Recipe):
_NAME = "declare-os-user"
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.user_name = check_type(args.get("name"), str)
def prepare(self, preparation: Preparation, head: "Head") -> None:
preparation.provides("os-user", self.user_name)
async def cook(self, kitchen: Kitchen) -> None:
kitchen.get_dependency_tracker()
class DeclareLinuxGroup(Recipe):
_NAME = "declare-os-group"
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.name = check_type(args.get("name"), str)
def prepare(self, preparation: Preparation, head: "Head") -> None:
preparation.provides("os-group", self.name)
async def cook(self, kitchen: Kitchen) -> None:
kitchen.get_dependency_tracker()

View File

@ -1,17 +1,17 @@
from scone.default.utensils.db_utensils import PostgresTransaction
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
class PostgresDatabase(Recipe):
_NAME = "pg-db"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.database_name = slug
self.database_name = check_type(args.get("name"), str)
self.owner = check_type(args.get("owner"), str)
self.encoding = args.get("encoding", "utf8")
self.collate = args.get("collate", "en_GB.utf8")
@ -56,10 +56,10 @@ class PostgresDatabase(Recipe):
class PostgresUser(Recipe):
_NAME = "pg-user"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.user_name = slug
self.user_name = check_type(args.get("name"), str)
self.password = check_type(args.get("password"), str)
def prepare(self, preparation: Preparation, head: Head) -> None:

View File

@ -1,12 +1,11 @@
from pathlib import Path
from typing import Tuple, List
from typing import List, Tuple
from scone.default.recipes.apt import AptPackage
from scone.default.steps.basic_steps import exec_no_fails
from scone.default.steps.filesystem_steps import depend_remote_file
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
@ -20,8 +19,8 @@ class PythonVenv(Recipe):
_NAME = "python-venv"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.dir = check_type(args.get("dir"), str)
self.interpreter = check_type(args.get("interpreter"), str)
@ -39,31 +38,29 @@ class PythonVenv(Recipe):
def prepare(self, preparation: Preparation, head: Head):
super().prepare(preparation, head)
preparation.needs("dir", str(Path(self.dir).parent))
preparation.needs("directory", str(Path(self.dir).parent))
for name, flags in self.install:
if "-r" in flags:
preparation.needs("file", name)
elif "git" in flags or "dir" in flags:
preparation.needs("dir", name)
preparation.needs("directory", name)
final_script = str(Path(self.dir, "bin/python"))
preparation.provides("file", str(final_script))
if not self.no_apt_install:
preparation.subrecipe(
AptPackage(
self.get_host(), "@venv-apt", {"packages": ["python3-venv"]}, head
)
)
preparation.needs("apt-stage", "packages-installed")
# preparation.subrecipe(
# AptPackage(self.recipe_context, {"packages": ["python3-venv"]})
# )
# preparation.needs("apt-stage", "packages-installed")
preparation.needs("apt-package", "python3-venv")
async def cook(self, kitchen: Kitchen):
dt = kitchen.get_dependency_tracker()
await exec_no_fails(
kitchen, [self.interpreter, "-m", "venv", self.dir], "/"
)
await exec_no_fails(kitchen, [self.interpreter, "-m", "venv", self.dir], "/")
install_args = []
for name, flags in self.install:

View File

@ -2,9 +2,9 @@ from typing import Dict
from scone.default.recipes.filesystem import CommandOnChange
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type, check_type_opt
@ -17,10 +17,11 @@ class SystemdUnit(Recipe):
daemon_reloaders: Dict[str, CommandOnChange] = {}
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.unit_name = slug if "." in slug else slug + ".service"
unit = check_type(args.get("unit"), str)
self.unit_name = unit if "." in unit else unit + ".service"
self.at = check_type(args.get("at"), str)
self.enabled = check_type_opt(args.get("enabled"), bool)
self.restart_on = check_type_opt(args.get("restart_on"), list)
@ -32,44 +33,49 @@ class SystemdUnit(Recipe):
if self.enabled is not None:
enable_recipe = SystemdEnabled(
self.get_host(),
self.unit_name,
{"enabled": self.enabled, "at": self.at, ".user": "root"},
head,
self.recipe_context,
{
"unit": self.unit_name,
"enabled": self.enabled,
"at": self.at,
".user": "root",
},
None,
)
preparation.subrecipe(enable_recipe)
preparation.needs("systemd-stage", "enabled")
daemon_reloader = SystemdUnit.daemon_reloaders.get(self.get_host(), None)
daemon_reloader = SystemdUnit.daemon_reloaders.get(
self.recipe_context.sous, None
)
if not daemon_reloader:
# TODO this should be replaced with a dedicated command which provides
# those units.
daemon_reloader = CommandOnChange(
self.get_host(),
"systemd-internal",
self.recipe_context,
{
"purpose": "systemd.daemon_reload",
"command": ["systemctl", "daemon-reload"],
"files": [],
".user": "root",
},
head,
None,
)
preparation.subrecipe(daemon_reloader)
file_list = getattr(daemon_reloader, "_args")["files"]
# file_list = getattr(daemon_reloader, "_args")["files"]
file_list = [] # TODO
file_list.append(self.at)
if self.restart_on:
service_reloader = CommandOnChange(
self.get_host(),
"systemd-internal",
self.recipe_context,
{
"purpose": "systemd.unit_reload",
"command": ["systemctl", "reload", self.unit_name],
"files": self.restart_on + [self.at],
".user": "root",
},
head,
None,
)
preparation.subrecipe(service_reloader)
@ -85,10 +91,11 @@ class SystemdEnabled(Recipe):
_NAME = "systemd-enabled"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.unit_name = slug if "." in slug else slug + ".service"
unit = check_type(args.get("unit"), str)
self.unit_name = unit if "." in unit else unit + ".service"
self.at = check_type(args.get("at"), str)
self.enabled = check_type_opt(args.get("enabled"), bool)

View File

@ -2,9 +2,9 @@ from pathlib import PurePath
from typing import List, Optional, Union
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Recipe
from scone.head.exceptions import CookingError
from scone.head.kitchen import Kitchen, current_recipe
from scone.head.recipe import Recipe
class ExecutionFailure(CookingError):
@ -47,8 +47,8 @@ async def exec_no_fails(
raise ExecutionFailure(
args,
working_dir,
recipe.get_host(),
recipe.get_user(kitchen.head),
recipe.recipe_context.sous,
recipe.recipe_context.user,
result,
)
else:

View File

@ -4,10 +4,9 @@ from typing import List, Optional, Tuple, Union
from jinja2 import Template
from scone.head import Head
from scone.head.head import Head
from scone.head.kitchen import Kitchen
SUPERMARKET_RELATIVE = ".scone-cache/supermarket"

View File

@ -1,201 +0,0 @@
import copy
import itertools
import logging
import re
import sys
from os import path
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, cast
import toml
from nacl.encoding import URLSafeBase64Encoder
from scone.common.loader import ClassLoader
from scone.common.misc import eprint
from scone.common.pools import Pools
from scone.head import menu_reader
from scone.head.menu_reader import HostMenu, Menu
from scone.head.recipe import Recipe, recipe_name_getter
from scone.head.secrets import SecretAccess
from scone.head.variables import Variables, merge_right_into_left_inplace
logger = logging.getLogger(__name__)
class Head:
def __init__(
self,
directory: str,
recipe_loader: ClassLoader[Recipe],
menu: Menu,
sous: Dict[str, dict],
groups: Dict[str, List[str]],
secret_access: Optional[SecretAccess],
pools: Pools,
):
self.directory = directory
self.recipe_loader = recipe_loader
self.menu = menu
self.souss = sous
self.groups = groups
self.secret_access = secret_access
self.variables: Dict[str, Variables] = dict()
self.pools = pools
@staticmethod
def open(directory: str):
with open(path.join(directory, "scone.head.toml")) as head_toml:
head_data = toml.load(head_toml)
secret_access: Optional[SecretAccess] = None
if "freezer" in head_data and "restaurant_id" in head_data["freezer"]:
secret_access = SecretAccess(head_data["freezer"]["restaurant_id"])
secret_access.get_existing()
if not secret_access.key:
eprint("Failed to load freezer secret.")
sys.exit(12)
recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"])
# load available recipes
recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter)
for recipe_root in recipe_module_roots:
recipe_loader.add_package_root(recipe_root)
sous = head_data.get("sous", dict())
groups = head_data.get("group", dict())
groups["all"] = list(sous.keys())
# load the menu
menu = menu_reader.parse_toml_menu_descriptors(path.join(directory, "menu"))
pools = Pools()
head = Head(directory, recipe_loader, menu, sous, groups, secret_access, pools)
head._load_variables()
return head
def _preload_variables(self, who_for: str) -> Tuple[dict, dict]:
out_frozen: Dict[str, Any] = {}
out_chilled: Dict[str, Any] = {}
vardir = Path(self.directory, "vars", who_for)
logger.debug("preloading vars for %s in %s", who_for, str(vardir))
for file in vardir.glob("*.vf.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for frozen vars", file)
frozen_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_frozen, frozen_vars)
for file in vardir.glob("*.v.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for vars", file)
chilled_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_chilled, chilled_vars)
to_transform = [out_frozen]
while to_transform:
next_dict = to_transform.pop()
for k, v in next_dict.items():
if isinstance(v, str):
b64_secret = re.sub(r"\s", "", v)
if not self.secret_access:
raise RuntimeError("Secret access disabled; cannot thaw.")
next_dict[k] = self.secret_access.decrypt_bytes(
b64_secret.encode(), encoder=URLSafeBase64Encoder
).decode()
elif isinstance(v, dict):
to_transform.append(v)
else:
raise ValueError(f"Not permitted in frozen variables file: '{v}'.")
return out_chilled, out_frozen
def _load_variables(self):
preload: Dict[str, Tuple[dict, dict]] = dict()
for who_name in itertools.chain(self.souss, self.groups):
preload[who_name] = self._preload_variables(who_name)
for sous_name in self.souss:
order = ["all"]
order += [
group
for group, members in self.groups.items()
if sous_name in members and group != "all"
]
order.append(sous_name)
chilled: Dict[str, Any] = {}
frozen: Dict[str, Any] = {}
for who_name in order:
in_chilled, in_frozen = preload[who_name]
merge_right_into_left_inplace(chilled, in_chilled)
merge_right_into_left_inplace(frozen, in_frozen)
sous_vars = Variables()
sous_vars.load_plain(frozen)
sous_vars.load_vars_with_substitutions(chilled)
self.variables[sous_name] = sous_vars
def _construct_hostmenu_for(
self, hostmenu: HostMenu, host: str, recipe_list: List[Recipe], head: "Head"
) -> None:
for recipe_id, dishes in hostmenu.dishes.items():
recipe_cls = self.recipe_loader.get_class(recipe_id)
if not recipe_cls:
raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.")
for slug, args in dishes.items():
args = copy.deepcopy(args)
self.variables[host].substitute_inplace_in_dict(args)
recipe = recipe_cls.from_menu(host, slug, args, head)
recipe_list.append(recipe)
def construct_recipes(self):
recipes = {}
for sous in self.souss:
logger.debug("Constructing recipes for %s", sous)
sous_recipe_list: List[Recipe] = []
# construct recipes for it only
sous_hm = self.menu.hostmenus.get(sous)
if sous_hm is not None:
self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self)
# construct recipes for it that are for groups it is in
for group, members in self.groups.items():
if sous in members:
group_hm = self.menu.hostmenus.get(group)
if group_hm is not None:
self._construct_hostmenu_for(
group_hm, sous, sous_recipe_list, self
)
recipes[sous] = sous_recipe_list
logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous)
return recipes
def debug_info(self) -> str:
lines = []
lines.append("Head Configuration")
lines.append(" Sous List")
for name, sous in self.souss.items():
lines.append(f" - {name} = {sous}")
lines.append("")
lines.append(" Sous Groups")
for name, group in self.groups.items():
lines.append(f" - {name} = {group}")
lines.append("")
lines += [" " + line for line in str(self.recipe_loader).splitlines()]
lines.append("")
lines += [" " + line for line in str(self.menu).splitlines()]
lines.append("")
return "\n".join(lines)

View File

@ -8,10 +8,10 @@ from pathlib import Path
from scone.common.misc import eprint
from scone.common.pools import Pools
from scone.head import Head
from scone.head.dependency_tracking import DependencyCache, run_dep_checks
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation, Recipe
from scone.head import dot_emitter
from scone.head.dependency_tracking import DependencyCache
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
def cli() -> None:
@ -64,43 +64,38 @@ async def cli_async() -> int:
eprint(f"Selected the following souss: {', '.join(hosts)}")
recipes_by_sous = head.construct_recipes()
recipes_to_do = []
for sous in hosts:
recipes_to_do += recipes_by_sous.get(sous, [])
eprint(f"Preparing {len(recipes_to_do)} recipes…")
prepare = Preparation(recipes_to_do)
eprint("Preparing recipes…")
prepare = Preparation(head)
start_ts = time.monotonic()
order = prepare.prepare(head)
notifying_provides = prepare.notifying_provides
prepare.prepare_all()
del prepare
end_ts = time.monotonic()
eprint(f"Preparation completed in {end_ts - start_ts:.3f} s.")
eprint(f"{len(order)} courses planned.")
# eprint(f"{len(order)} courses planned.")
dot_emitter.emit_dot(head.dag, Path(cdir, "dag.0.dot"))
dep_cache = await DependencyCache.open(
os.path.join(head.directory, "depcache.sqlite3")
)
eprint("Checking dependency cache…")
start_ts = time.monotonic()
depchecks = await run_dep_checks(head, dep_cache, order)
end_ts = time.monotonic()
eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts
for epoch, items in enumerate(order):
print(f"----- Course {epoch} -----")
for item in items:
if isinstance(item, Recipe):
state = depchecks[item].label.name
print(f" > recipe ({state}) {item}")
elif isinstance(item, tuple):
kind, ident, extra = item
print(f" - we now have {kind} {ident} {dict(extra)}")
# eprint("Checking dependency cache…")
# start_ts = time.monotonic()
# depchecks = await run_dep_checks(head, dep_cache, order)
# end_ts = time.monotonic()
# eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts
#
# for epoch, items in enumerate(order):
# print(f"----- Course {epoch} -----")
#
# for item in items:
# if isinstance(item, Recipe):
# state = depchecks[item].label.name
# print(f" > recipe ({state}) {item}")
# elif isinstance(item, tuple):
# kind, ident, extra = item
# print(f" - we now have {kind} {ident} {dict(extra)}")
eprint("Ready to cook? [y/N]: ", end="")
if argp.yes:
@ -110,16 +105,21 @@ async def cli_async() -> int:
eprint("Stopping.")
return 101
kitchen = Kitchen(head, dep_cache, notifying_provides)
kitchen = Kitchen(head, dep_cache)
for epoch, epoch_items in enumerate(order):
print(f"Cooking Course {epoch} of {len(order)}")
await kitchen.run_epoch(
epoch_items, depchecks, concurrency_limit_per_host=8
)
# for epoch, epoch_items in enumerate(order):
# print(f"Cooking Course {epoch} of {len(order)}")
# await kitchen.run_epoch(
# epoch_items, depchecks, concurrency_limit_per_host=8
# )
#
# for sous in hosts: TODO this is not definitely safe
# await dep_cache.sweep_old(sous)
for sous in hosts:
await dep_cache.sweep_old(sous)
try:
await kitchen.cook_all()
finally:
dot_emitter.emit_dot(head.dag, Path(cdir, "dag.9.dot"))
return 0
finally:

193
scone/head/dag.py Normal file
View File

@ -0,0 +1,193 @@
from collections import defaultdict
from enum import Enum
from typing import Dict, Optional, Set, Union
import attr
from frozendict import frozendict
from scone.head.recipe import Recipe
class RecipeState(Enum):
# Just loaded from menu, or otherwise created
LOADED = 0
# Has been prepared — we know its dependencies for this run
PREPARED = 1
# This recipe needs to be cooked, but may be blocked by dependencies
PENDING = 2
# This recipe is not blocked by any further
COOKABLE = 3
# This recipe is being cooked
BEING_COOKED = 4
# This recipe has been cooked!
COOKED = 5
# This recipe has not been cooked because it didn't need to be.
SKIPPED = 10
@staticmethod
def is_completed(state):
return state in (RecipeState.COOKED, RecipeState.SKIPPED)
@attr.s(auto_attribs=True)
class RecipeMeta:
"""
State of the recipe.
"""
state: RecipeState = RecipeState.LOADED
"""
Uncompleted incoming edge count.
"""
incoming_uncompleted: int = 0
@attr.s(auto_attribs=True, frozen=True)
class Resource:
"""
Resource kind.
"""
kind: str
"""
Resource ID
"""
id: str
"""
Resource sous, or None if it's on the head
"""
sous: Optional[str]
"""
Optional dict of extra parameters needed to disambiguate the resource,
though should only be used where necessary and sensible to do so.
"""
# extra_params: Optional[frozendict[str, str]] = None
extra_params: Optional[frozendict] = None
def __str__(self) -> str:
extra_str = "" if not self.extra_params else f" {self.extra_params!r}"
sous_str = "" if not self.sous else f" on {self.sous}"
return f"{self.kind}({self.id}){extra_str}{sous_str}"
@attr.s(auto_attribs=True)
class ResourceMeta:
"""
Whether the resource is completed or not.
A resource becomes completed when all its incoming edges are completed,
or it has no incoming edges and is not a hard need.
"""
completed: bool = False
"""
Uncompleted incoming edge count.
"""
incoming_uncompleted: int = 0
"""
Whether the resource is considered a hard need.
A resource is a hard need when we cannot proceed without something
providing it.
"""
hard_need: bool = False
Vertex = Union["Recipe", Resource]
class RecipeDag:
def __init__(self):
self.vertices: Set[Vertex] = set()
# edges go from A -> B where B needs A to run.
self.edges: Dict[Vertex, Set[Vertex]] = defaultdict(set)
self.reverse_edges: Dict[Vertex, Set[Vertex]] = defaultdict(set)
self.recipe_meta: Dict[Recipe, RecipeMeta] = dict()
self.resource_meta: Dict[Resource, ResourceMeta] = dict()
self.resource_time: Dict[Resource, int] = dict()
def add(self, vertex: Vertex):
self.vertices.add(vertex)
if isinstance(vertex, Recipe):
self.recipe_meta[vertex] = RecipeMeta()
elif isinstance(vertex, Resource):
self.resource_meta[vertex] = ResourceMeta()
def needs(
self, needer: "Recipe", resource: Resource, soft_wants: bool = False
) -> None:
if needer not in self.vertices:
raise ValueError(f"Needer {needer} not in vertices!")
if resource not in self.vertices:
self.add(resource)
if needer in self.edges[resource]:
return
self.edges[resource].add(needer)
self.reverse_edges[needer].add(resource)
needer_meta = self.recipe_meta[needer]
resource_meta = self.resource_meta[resource]
if not soft_wants:
resource_meta.hard_need = True
if not resource_meta.completed:
needer_meta.incoming_uncompleted += 1
def provides(self, provider: "Recipe", resource: Resource) -> None:
if provider not in self.vertices:
raise ValueError(f"Provider {provider} not in vertices!")
if resource not in self.vertices:
self.add(resource)
if resource in self.edges[provider]:
return
self.edges[provider].add(resource)
self.reverse_edges[resource].add(provider)
provider_meta = self.recipe_meta[provider]
resource_meta = self.resource_meta[resource]
if not RecipeState.is_completed(provider_meta.state):
resource_meta.incoming_uncompleted += 1
resource_meta.completed = False
else:
if resource_meta.incoming_uncompleted == 0:
resource_meta.completed = True
def add_ordering(self, before: "Recipe", after: "Recipe") -> None:
if before not in self.vertices:
raise ValueError(f"Before {before} not in vertices!")
if after not in self.vertices:
raise ValueError(f"After {after} not in vertices!")
after_meta = self.recipe_meta[after]
before_meta = self.recipe_meta[before]
if after in self.edges[before]:
return
self.edges[before].add(after)
self.reverse_edges[after].add(before)
if not RecipeState.is_completed(before_meta.state):
after_meta.incoming_uncompleted += 1
# TODO if after_meta.state ==
# TODO else ...

View File

@ -1,11 +1,8 @@
import asyncio
import json
import logging
import time
from asyncio import Queue
from enum import Enum
from hashlib import sha256
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import aiosqlite
import attr
@ -13,10 +10,12 @@ import canonicaljson
import cattr
from aiosqlite import Connection
from scone.common.misc import sha256_file
from scone.common.pools import Pools
from scone.head import Head, Recipe, Variables, recipe_name_getter
from scone.head.recipe import DepEle
from scone.head.dag import Resource
from scone.head.recipe import recipe_name_getter
if TYPE_CHECKING:
from scone.head.dag import RecipeDag
from scone.head.recipe import Recipe
canonicaljson.set_json_library(json)
logger = logging.getLogger(__name__)
@ -24,9 +23,6 @@ logger = logging.getLogger(__name__)
# TODO(security, low): how to prevent passwords being recovered from the
# paramhashes in a dependency store?
# TODO(correctness, perf): recipes with @src@0 slugs should not be registered
# to a slug.
def _canonicalise_dict(input: Dict[str, Any]) -> Dict[str, Any]:
output: Dict[str, Any] = {}
@ -48,139 +44,104 @@ def hash_dict(value: dict) -> str:
).hexdigest()
def paramhash_recipe(recipe: Recipe) -> str:
args = getattr(recipe, "_args").copy()
del args[".source"]
return hash_dict(args)
def paramhash_recipe(recipe: "Recipe") -> str:
return hash_dict(
{
"args": recipe.arguments,
"sous": recipe.recipe_context.sous,
"user": recipe.recipe_context.user,
}
)
@attr.s(auto_attribs=True)
class DependencyBook:
var_names: List[str]
var_hash: str
fridge_hashes: Dict[str, str]
recipe_revisions: Dict[str, int]
dyn_sous_file_hashes: Dict[str, str]
provided: Dict[Resource, int] = dict()
watching: Dict[Resource, int] = dict()
last_changed: int = 0
cache_data: Dict[str, Any] = dict()
ignored: bool = False
async def can_skip_static(self, head: Head, recipe: Recipe) -> bool:
from scone.default.steps.fridge_steps import search_in_fridge
# TODO(performance, feature): track more in-depth details, perhaps as a
# per-resource cache thing, so that we can track the info needed to know
# if it changed...?
# start with variables
sous_vars = head.variables[recipe.get_host()]
var_comp = dict()
for var_name in self.var_names:
try:
var_comp[var_name] = sous_vars.get_dotted(var_name)
except KeyError:
return False
def _unstructure(self) -> dict:
return {
"provided": cattr.unstructure(tuple(self.provided.items())),
"watching": cattr.unstructure(tuple(self.watching.items())),
"last_changed": self.last_changed,
"cache_data": self.cache_data,
"ignored": self.ignored,
}
if hash_dict(var_comp) != self.var_hash:
return False
@staticmethod
def _structure(dictionary: dict) -> "DependencyBook":
provided = {cattr.structure(k, Resource): v for k, v in dictionary["provided"]}
watching = {cattr.structure(k, Resource): v for k, v in dictionary["watching"]}
# now we have to check files in the fridge
for fridge_name, expected_hash in self.fridge_hashes.items():
real_pathstr = search_in_fridge(head, fridge_name)
if not real_pathstr:
# vanished locally; that counts as a change
return False
real_hash = await asyncio.get_running_loop().run_in_executor(
head.pools.threaded, sha256_file, real_pathstr
)
if real_hash != expected_hash:
return False
return DependencyBook(
provided=provided,
watching=watching,
last_changed=dictionary["last_changed"],
cache_data=dictionary["cache_data"],
ignored=dictionary["ignored"],
)
return True
def has_dynamic(self) -> bool:
return len(self.dyn_sous_file_hashes) > 0
cattr.global_converter.register_unstructure_hook(
DependencyBook, DependencyBook._unstructure
)
cattr.global_converter.register_structure_hook(
DependencyBook, DependencyBook._structure
)
class DependencyTracker:
"""
Tracks the dependencies of a task and then inserts a row as needed.
"""
def __init__(self, book: DependencyBook, dag: "RecipeDag"):
self.book: DependencyBook = book
self._dag: RecipeDag = dag
self._time: int = int(time.time() * 1000)
def __init__(self, pools: Pools):
self._vars: Dict[str, Any] = {}
self._fridge: Dict[str, str] = {}
self._recipe_revisions: Dict[str, int] = {}
self._dyn_sous_files: Dict[str, str] = {}
self._ignored = False
self._pools = pools
def watch(self, resource: Resource) -> None:
# XXX self.book.watching[resource] = self._dag.resource_time[resource]
self.book.watching[resource] = -42
def ignore(self):
"""
Call when dependency tracking is not desired (or not advanced enough to
be useful.)
"""
self._ignored = True
def provide(self, resource: Resource, time: Optional[int] = None) -> None:
if time is None:
time = self._time
self._dag.resource_time[resource] = time
async def register_fridge_file(self, fridge_path: str, real_path: str):
if fridge_path not in self._fridge:
f_hash = await asyncio.get_running_loop().run_in_executor(
self._pools.threaded, sha256_file, real_path
)
self._fridge[fridge_path] = f_hash
def register_recipe(self, recipe: Recipe):
cls = recipe.__class__
rec_name = recipe_name_getter(cls)
if not rec_name:
return
self._recipe_revisions[rec_name] = getattr(cls, "_REVISION", None)
def ignore(self) -> None:
self.book.ignored = True
def register_variable(self, variable: str, value: Union[dict, str, int]):
self._vars[variable] = value
# self._vars[variable] = value
raise NotImplementedError("time")
def register_remote_file(self, file: str, file_hash: str):
self._dyn_sous_files[file] = file_hash
def register_fridge_file(self, path: str):
# TODO this is not complete
fridge_res = Resource("fridge", path, None)
self.watch(fridge_res)
def make_depbook(self) -> Optional[DependencyBook]:
if self._ignored:
return None
dep_book = DependencyBook(
list(self._vars.keys()),
hash_dict(self._vars),
self._fridge.copy(),
self._recipe_revisions,
self._dyn_sous_files,
)
return dep_book
def register_remote_file(self, path: str, sous: str):
# TODO this is not complete
file_res = Resource("file", path, sous=sous)
self.watch(file_res)
def get_j2_compatible_dep_var_proxies(
self, variables: Variables
) -> Dict[str, "DependencyVarProxy"]:
result = {}
for key, vars in variables.toplevel().items():
result[key] = DependencyVarProxy(self, vars, key + ".")
return result
class DependencyVarProxy:
"""
Provides convenient access to variables that also properly tracks
dependencies.
"""
def __init__(
self, dependency_tracker: DependencyTracker, variables: dict, prefix: str = ""
):
self._dvp_dt: DependencyTracker = dependency_tracker
self._dvp_prefix = prefix
self._dvp_vars = variables
def __getattr__(self, key: str):
fully_qualified_varname = self._dvp_prefix + key
value = self._dvp_vars.get(key, ...)
if value is ...:
raise KeyError(f"Variable does not exist: {fully_qualified_varname}")
elif isinstance(value, dict):
return DependencyVarProxy(self._dvp_dt, value, key + ".")
else:
self._dvp_dt.register_variable(fully_qualified_varname, value)
return value
# def get_j2_compatible_dep_var_proxies(
# self, variables: Variables
# ) -> Dict[str, "DependencyVarProxy"]:
# # XXX BROKEN does not work for overrides
# result = {}
#
# if len("1"):
# raise NotImplementedError("BROKEN")
#
# for key, vars in variables.toplevel().items():
# result[key] = DependencyVarProxy(self, vars, key + ".")
#
# return result
class DependencyCache:
@ -195,16 +156,14 @@ class DependencyCache:
await dc.db.executescript(
"""
CREATE TABLE IF NOT EXISTS dishcache (
source_file TEXT,
host TEXT,
recipe_id TEXT,
slug TEXT,
-- source_file TEXT,
recipe_kind TEXT,
paramhash TEXT,
dep_book TEXT,
ts INT,
PRIMARY KEY (source_file, host, recipe_id, slug, paramhash)
PRIMARY KEY (recipe_kind, paramhash)
);
CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts);
-- CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts);
"""
)
await dc.db.commit()
@ -223,25 +182,16 @@ class DependencyCache:
)
await self.db.commit()
async def inquire(self, recipe: Recipe) -> Optional[Tuple[int, DependencyBook]]:
async def inquire(self, recipe: "Recipe") -> Optional[Tuple[int, DependencyBook]]:
paramhash = paramhash_recipe(recipe)
rows = await self.db.execute_fetchall(
"""
SELECT rowid, dep_book FROM dishcache
WHERE source_file = ?
AND host = ?
AND recipe_id = ?
WHERE recipe_kind = ?
AND paramhash = ?
AND slug = ?
LIMIT 1
""",
(
recipe._args[".source"][0],
recipe.get_host(),
recipe_name_getter(recipe.__class__),
paramhash,
recipe._slug,
),
(recipe_name_getter(recipe.__class__), paramhash,),
)
rows = list(rows)
if not rows:
@ -259,23 +209,20 @@ class DependencyCache:
return rowid, dep_book
async def register(self, recipe: Recipe, dep_book: DependencyBook):
async def register(self, recipe: "Recipe", dep_book: DependencyBook):
paramhash = paramhash_recipe(recipe)
await self.db.execute(
"""
INSERT INTO dishcache
(source_file, host, recipe_id, slug, paramhash, dep_book, ts)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source_file, host, recipe_id, paramhash, slug)
(recipe_kind, paramhash, dep_book, ts)
VALUES (?, ?, ?, ?)
ON CONFLICT (recipe_kind, paramhash)
DO UPDATE SET
dep_book = excluded.dep_book,
ts = excluded.ts
""",
(
recipe._args[".source"][0],
recipe.get_host(),
recipe_name_getter(recipe.__class__),
recipe._slug,
paramhash,
canonicaljson.encode_canonical_json(cattr.unstructure(dep_book)),
self.time,
@ -292,75 +239,3 @@ class DependencyCache:
(self.time, rowid),
)
await self.db.commit()
class CheckOutcomeLabel(Enum):
# Not in dependency cache, so must run.
NOT_CACHED = 0
# Dependency cache suggests we must rerun
MUST_REDO = 1
# Dependency cache suggests we are fine if dynamic dependencies haven't
# changed
CHECK_DYNAMIC = 2
# Dependency cache says we can skip; there are no dynamic dependencies
SAFE_TO_SKIP = 3
DepCheckOutcome = NamedTuple(
"DepCheckOutcome",
(("label", CheckOutcomeLabel), ("book", Optional[DependencyBook])),
)
async def run_dep_checks(
head: Head, dep_cache: DependencyCache, order: List[List[DepEle]]
) -> Dict[Recipe, DepCheckOutcome]:
queue: Queue[Optional[Recipe]] = Queue(32)
outcomes = {}
async def consumer():
while True:
recipe = await queue.get()
if not recipe:
break
t = await dep_cache.inquire(recipe)
if t:
# we need to check if dependencies have changed…
rowid, dep_book = t
if await dep_book.can_skip_static(head, recipe):
# we will renew either way
await dep_cache.renew(rowid)
if dep_book.has_dynamic():
# has dynamic dependencies
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.CHECK_DYNAMIC, dep_book
)
else:
# can skip!
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.SAFE_TO_SKIP, None
)
else:
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.MUST_REDO, None
)
else:
outcomes[recipe] = DepCheckOutcome(CheckOutcomeLabel.NOT_CACHED, None)
queue.task_done()
async def producer():
for course in order:
for recipe in course:
if isinstance(recipe, Recipe):
await queue.put(recipe)
await queue.join()
for worker in consumers:
await queue.put(None)
consumers = [asyncio.create_task(consumer()) for _ in range(8)]
await asyncio.gather(*consumers, producer(), return_exceptions=False)
return outcomes

57
scone/head/dot_emitter.py Normal file
View File

@ -0,0 +1,57 @@
from pathlib import Path
from typing import Dict
from scone.head.dag import RecipeDag, RecipeState, Resource, Vertex
from scone.head.recipe import Recipe, recipe_name_getter
state_to_colour = {
RecipeState.LOADED: "#000000",
RecipeState.PREPARED: "azure",
RecipeState.PENDING: "pink",
RecipeState.COOKABLE: "gold",
RecipeState.COOKED: "darkolivegreen1",
RecipeState.SKIPPED: "cadetblue1",
RecipeState.BEING_COOKED: "darkorange1",
}
def emit_dot(dag: RecipeDag, path_out: Path) -> None:
with open(path_out, "w") as fout:
fout.write("digraph recipedag {\n")
ids: Dict[Vertex, str] = dict()
fout.write("\t// Vertices\n")
for idx, vertex in enumerate(dag.vertices):
vertex_id = f"v{idx}"
ids[vertex] = vertex_id
if isinstance(vertex, Recipe):
rec_meta = dag.recipe_meta[vertex]
label = (
f"{recipe_name_getter(vertex.__class__)}"
f" [{rec_meta.incoming_uncompleted}]"
)
colour = state_to_colour[rec_meta.state]
fout.write(
f'\t{vertex_id} [shape=box, label="{label}",'
f" style=filled, fillcolor={colour}];\n"
)
elif isinstance(vertex, Resource):
label = str(vertex).replace("\\", "\\\\").replace('"', '\\"')
res_meta = dag.resource_meta[vertex]
colour = "darkolivegreen1" if res_meta.completed else "pink"
fout.write(
f'\t{vertex_id} [label="{label}",'
f" style=filled, fillcolor={colour}];\n"
)
else:
raise ValueError(f"? vertex {vertex!r}")
fout.write("\n\t// Edges\n")
for from_vert, edges in dag.edges.items():
for to_vert in edges:
fout.write(f"\t{ids[from_vert]} -> {ids[to_vert]};\n")
fout.write("}\n")

163
scone/head/grammar/scoml.tx Normal file
View File

@ -0,0 +1,163 @@
// root
Unit:
Block
;
Comment:
/#.*$/
;
Block:
directives*=Directive
''
recipes+=RecipeOrSubBlock
;
RecipeOrSubBlock:
Recipe | SubBlock
;
SubBlock[ws=' \t']:
unique_id=ID '{' human=/.*$/ /\n/+
block=Block
'}'
;
Directive:
UserDirective | SousDirective | ForDirective | ImportDirective |
RecipeEdgeDirective | ResourceEdgeDirective | ListenEdgeDirective
;
UserDirective[ws=' \t']:
'@user' '=' user=ID /\n/+
;
SousDirective[ws=' \t']:
'@sous' '=' sous=ID /\n/+
;
ImportDirective[ws=' \t']:
'@import' importee=ID /\n/+
;
ForDirective[ws=' \t']:
'@for' loop_variable=DottedIdString 'in'
(
collection=DottedIdString /\n/+
|
':' /\n/
list=NaturalList
)
;
ResourceEdgeDirectiveKind:
'@needs' | '@wants' | '@provides'
;
ResourceEdgeDirective[ws=' \t']:
kind=ResourceEdgeDirectiveKind
resource=Resource
;
RecipeEdgeDirectiveKind:
'@after' | '@before'
;
RecipeEdgeDirective[ws=' \t']:
kind=RecipeEdgeDirectiveKind
':' id=ID
// TODO 'on other sous' ?
;
ListenEdgeDirectiveKind:
'@when' | '@only when'
;
ListenEdgeDirective[ws=' \t']:
kind=ListenEdgeDirectiveKind
(recipe_id=ID | resource=Resource)
'changes'
;
Resource:
type=ID '(' (primary=UnquotedString | primary=QuotedString) ')'
(extra_params=BraceDict)?
('on' sous=ID)?
;
NaturalList:
elements+=NaturalListElement
;
NaturalListElement[ws=' \t']:
//'-' item=KeyExpr /\n/+
'-' KeyExpr /\n/+
;
Recipe[ws=' \t']:
'[[' kind=DottedIdString (':' unique_id=DottedIdString)? ']]' human=/.*$/ /\n/+
directives*=Directive
args*=RecipeArgument
/\n*/
;
RecipeArgument[ws=' \t']:
name=ID
(
'=' value=ValueExpr /\n/+
|
':' /\n/
value=NaturalList
)
;
KeyExpr:
QuotedString | UnquotedString | Integer
;
ValueExpr:
QuotedString | Integer | Boolean | BracketList | BraceDict | UnquotedString
;
QuotedString:
value=STRING
;
UnquotedString:
value=/[^\s\n,"()0-9]([^\n,"()]*[^\s\n,"()])?/
;
DottedIdString:
/[a-zA-Z_-][a-zA-Z0-9_\.-]*/
;
Integer:
value=INT
;
Boolean:
value=BOOL
;
BracketList[ws=' \t\n']:
'['
items*=ValueExpr[',']
']'
;
BraceDict[ws=' \t']:
'{'
pairs*=DictPair[',']
'}'
;
DictPair:
(key=KeyExpr) '=' (value=ValueExpr)
;

209
scone/head/head.py Normal file
View File

@ -0,0 +1,209 @@
import itertools
import logging
import re
import sys
from os import path
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast
import toml
from nacl.encoding import URLSafeBase64Encoder
from scone.common.loader import ClassLoader
from scone.common.misc import eprint
from scone.common.pools import Pools
from scone.head.dag import RecipeDag
from scone.head.menu_reader import MenuLoader
from scone.head.recipe import Recipe, recipe_name_getter
from scone.head.secrets import SecretAccess
from scone.head.variables import Variables, merge_right_into_left_inplace
logger = logging.getLogger(__name__)
class Head:
def __init__(
self,
directory: str,
recipe_loader: ClassLoader[Recipe],
sous: Dict[str, dict],
groups: Dict[str, List[str]],
secret_access: Optional[SecretAccess],
pools: Pools,
):
self.directory = directory
self.recipe_loader = recipe_loader
self.dag = RecipeDag()
self.souss = sous
self.groups = groups
self.secret_access = secret_access
self.variables: Dict[str, Variables] = dict()
self.pools = pools
@staticmethod
def open(directory: str):
with open(path.join(directory, "scone.head.toml")) as head_toml:
head_data = toml.load(head_toml)
secret_access: Optional[SecretAccess] = None
if "freezer" in head_data and "restaurant_id" in head_data["freezer"]:
secret_access = SecretAccess(head_data["freezer"]["restaurant_id"])
secret_access.get_existing()
if not secret_access.key:
eprint("Failed to load freezer secret.")
sys.exit(12)
recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"])
# load available recipes
recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter)
for recipe_root in recipe_module_roots:
recipe_loader.add_package_root(recipe_root)
sous = head_data.get("sous", dict())
groups = head_data.get("group", dict())
groups["all"] = list(sous.keys())
pools = Pools()
head = Head(directory, recipe_loader, sous, groups, secret_access, pools)
head._load_variables()
head._load_menus()
return head
def _preload_variables(self, who_for: str) -> Tuple[dict, dict]:
out_frozen: Dict[str, Any] = {}
out_chilled: Dict[str, Any] = {}
vardir = Path(self.directory, "vars", who_for)
logger.debug("preloading vars for %s in %s", who_for, str(vardir))
for file in vardir.glob("*.vf.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for frozen vars", file)
frozen_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_frozen, frozen_vars)
for file in vardir.glob("*.v.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for vars", file)
chilled_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_chilled, chilled_vars)
to_transform = [out_frozen]
while to_transform:
next_dict = to_transform.pop()
for k, v in next_dict.items():
if isinstance(v, str):
b64_secret = re.sub(r"\s", "", v)
if not self.secret_access:
raise RuntimeError("Secret access disabled; cannot thaw.")
next_dict[k] = self.secret_access.decrypt_bytes(
b64_secret.encode(), encoder=URLSafeBase64Encoder
).decode()
elif isinstance(v, dict):
to_transform.append(v)
else:
raise ValueError(f"Not permitted in frozen variables file: '{v}'.")
return out_chilled, out_frozen
def _load_variables(self):
preload: Dict[str, Tuple[dict, dict]] = dict()
for who_name in itertools.chain(self.souss, self.groups):
preload[who_name] = self._preload_variables(who_name)
for sous_name in self.souss:
order = ["all"]
order += [
group
for group, members in self.groups.items()
if sous_name in members and group != "all"
]
order.append(sous_name)
chilled: Dict[str, Any] = {}
frozen: Dict[str, Any] = {}
for who_name in order:
in_chilled, in_frozen = preload[who_name]
merge_right_into_left_inplace(chilled, in_chilled)
merge_right_into_left_inplace(frozen, in_frozen)
sous_vars = Variables(None)
sous_vars.load_plain(frozen)
sous_vars.load_vars_with_substitutions(chilled)
self.variables[sous_name] = sous_vars
def _load_menus(self):
loader = MenuLoader(Path(self.directory, "menu"), self)
loader.load_menus_in_dir()
loader.dagify_all()
# TODO remove
# def _construct_hostmenu_for(
# self, hostmenu: "HostMenu", host: str, recipe_list: List[Recipe], head: "Head"
# ) -> None:
# for recipe_id, dishes in hostmenu.dishes.items():
# recipe_cls = self.recipe_loader.get_class(recipe_id)
# if not recipe_cls:
# raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.")
# for slug, args in dishes.items():
# args = copy.deepcopy(args)
# self.variables[host].substitute_inplace_in_dict(args)
# recipe = recipe_cls.from_menu(host, slug, args, head)
# recipe_list.append(recipe)
#
# def construct_recipes(self):
# recipes = {}
# for sous in self.souss:
# logger.debug("Constructing recipes for %s", sous)
# sous_recipe_list: List[Recipe] = []
#
# # construct recipes for it only
# sous_hm = self.menu.hostmenus.get(sous)
# if sous_hm is not None:
# self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self)
#
# # construct recipes for it that are for groups it is in
# for group, members in self.groups.items():
# if sous in members:
# group_hm = self.menu.hostmenus.get(group)
# if group_hm is not None:
# self._construct_hostmenu_for(
# group_hm, sous, sous_recipe_list, self
# )
# recipes[sous] = sous_recipe_list
# logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous)
# return recipes
def debug_info(self) -> str:
lines = []
lines.append("Head Configuration")
lines.append(" Sous List")
for name, sous in self.souss.items():
lines.append(f" - {name} = {sous}")
lines.append("")
lines.append(" Sous Groups")
for name, group in self.groups.items():
lines.append(f" - {name} = {group}")
# lines.append("")
# lines += [" " + line for line in str(self.recipe_loader).splitlines()]
# lines.append("")
# lines += [" " + line for line in str(self.menu).splitlines()]
# lines.append("")
return "\n".join(lines)
def get_souss_for_hostspec(self, hostspec: str) -> Iterable[str]:
if hostspec in self.souss:
return (hostspec,)
else:
return self.groups[hostspec]

View File

@ -1,27 +1,27 @@
import asyncio
import logging
from asyncio import Future
from collections import defaultdict
from asyncio import Future, Queue
from collections import defaultdict, deque
from contextvars import ContextVar
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, TypeVar
from typing import Any, Deque, Dict, Optional, Tuple, Type, TypeVar
import attr
import cattr
from frozendict import frozendict
from scone.common.chanpro import Channel, ChanProHead
from scone.default.utensils.dynamic_dependencies import CanSkipDynamic
from scone.head import Head, Recipe, sshconn
from scone.common.misc import eprint
from scone.head import sshconn
from scone.head.dag import RecipeMeta, RecipeState, Resource, Vertex
from scone.head.dependency_tracking import (
CheckOutcomeLabel,
DepCheckOutcome,
DependencyBook,
DependencyCache,
DependencyTracker,
)
from scone.head.recipe import DepEle, DependencySpec
from scone.head.head import Head
from scone.head.recipe import Recipe
from scone.sous import utensil_namer
from scone.sous.utensils import Utensil
logger = logging.getLogger(__name__)
current_recipe: ContextVar[Recipe] = ContextVar("current_recipe")
@ -29,21 +29,95 @@ current_recipe: ContextVar[Recipe] = ContextVar("current_recipe")
A = TypeVar("A")
class Preparation:
def __init__(self, head: Head):
self.dag = head.dag
self.head = head
self._queue: Deque[Tuple[Recipe, RecipeMeta]] = deque()
self._current_recipe: Optional[Recipe] = None
def needs(
self,
requirement: str,
identifier: str,
hard: bool = True,
sous: Optional[str] = "(self)",
**extra_identifiers: Any,
) -> None:
assert self._current_recipe is not None
if sous == "(self)":
sous = self._current_recipe.recipe_context.sous
resource = Resource(
requirement, identifier, sous, frozendict(extra_identifiers)
)
self.dag.needs(self._current_recipe, resource, not hard)
def wants(self, requirement: str, identifier: str, **extra_identifiers: Any):
return self.needs(requirement, identifier, hard=False, **extra_identifiers)
def provides(
self,
requirement: str,
identifier: str,
sous: Optional[str] = "(self)",
**extra_identifiers: Any,
) -> None:
assert self._current_recipe is not None
if sous == "(self)":
sous = self._current_recipe.recipe_context.sous
resource = Resource(
requirement, identifier, sous, frozendict(extra_identifiers)
)
self.dag.provides(self._current_recipe, resource)
def after(self, other_recipe: "Recipe"):
assert self._current_recipe is not None
self.dag.add_ordering(other_recipe, self._current_recipe)
def before(self, other_recipe: "Recipe"):
assert self._current_recipe is not None
self.dag.add_ordering(self._current_recipe, other_recipe)
def subrecipe(self, sub: "Recipe"):
self.dag.add(sub)
self._queue.append((sub, self.dag.recipe_meta[sub]))
def prepare_all(self) -> None:
for recipe in self.dag.vertices:
if not isinstance(recipe, Recipe):
continue
meta = self.dag.recipe_meta[recipe]
if meta.state != RecipeState.LOADED:
continue
self._queue.append((recipe, meta))
while self._queue:
recipe, meta = self._queue.popleft()
self._current_recipe = recipe
recipe.prepare(self, self.head)
self._current_recipe = None
meta.state = RecipeState.PREPARED
class Kitchen:
def __init__(
self,
head: Head,
dependency_store: DependencyCache,
notifying_provides: Dict[Recipe, List[DependencySpec]],
self, head: "Head", dependency_store: DependencyCache,
):
self._chanproheads: Dict[Tuple[str, str], Future[ChanProHead]] = dict()
self._dependency_store = dependency_store
self._dependency_trackers: Dict[Recipe, DependencyTracker] = defaultdict(
lambda: DependencyTracker(head.pools)
lambda: DependencyTracker(DependencyBook(), head.dag)
)
self.head = head
self._notifying_provides = notifying_provides
self.notifications: Dict[DependencySpec, bool] = dict()
self.last_updated_ats: Dict[Resource, int] = dict()
self._cookable: Queue[Optional[Vertex]] = Queue()
self._sleeper_slots: int = 0
def get_dependency_tracker(self):
return self._dependency_trackers[current_recipe.get()]
@ -61,7 +135,7 @@ class Kitchen:
None,
user,
connection_details["souscmd"],
connection_details.get("dangerous_debug_logging", False)
connection_details.get("dangerous_debug_logging", False),
)
except Exception:
logger.error("Failed to open SSH connection", exc_info=True)
@ -75,32 +149,121 @@ class Kitchen:
return await self._chanproheads[hostuser]
async def run_epoch(
self,
epoch: List[DepEle],
depchecks: Dict[Recipe, DepCheckOutcome],
concurrency_limit_per_host: int = 5,
):
per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: [])
async def cook_all(self):
# TODO fridge emitter
# sort into per-host lists
for recipe in epoch:
if isinstance(recipe, Recipe):
if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP:
per_host_lists[recipe.get_host()].append(recipe)
num_workers = 8
coros: List[Coroutine] = []
self._sleeper_slots = num_workers - 1
for host, recipes in per_host_lists.items():
host_work_pool = HostWorkPool(recipes, depchecks)
coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host))
for vertex in self.head.dag.vertices:
if isinstance(vertex, Recipe):
rec_meta = self.head.dag.recipe_meta[vertex]
if rec_meta.incoming_uncompleted == 0:
rec_meta.state = RecipeState.COOKABLE
self._cookable.put_nowait(vertex)
else:
rec_meta.state = RecipeState.PENDING
elif isinstance(vertex, Resource):
res_meta = self.head.dag.resource_meta[vertex]
if res_meta.incoming_uncompleted == 0:
res_meta.completed = True
if res_meta.hard_need:
needers = self.head.dag.edges[vertex]
needers_str = "".join(f" - {n}\n" for n in needers)
raise RuntimeError(
f"Hard need 「{vertex}」 not satisfiable."
f" Needed by:\n{needers_str}"
)
self._cookable.put_nowait(vertex)
await asyncio.gather(*coros, return_exceptions=False)
workers = []
for _ in range(num_workers):
workers.append(self._cooking_worker())
await asyncio.gather(*workers, return_exceptions=False)
async def _cooking_worker(self):
dag = self.head.dag
while True:
if self._sleeper_slots <= 0 and self._cookable.empty():
self._sleeper_slots -= 1
self._cookable.put_nowait(None)
break
self._sleeper_slots -= 1
try:
next_job = await self._cookable.get()
finally:
self._sleeper_slots += 1
if next_job is None:
continue
if isinstance(next_job, Recipe):
meta = dag.recipe_meta[next_job]
# TODO try to deduplicate
meta.state = RecipeState.BEING_COOKED
current_recipe.set(next_job)
eprint(f"cooking {next_job}")
await next_job.cook(self)
eprint(f"cooked {next_job}")
# TODO cook
# TODO store depbook
await self._store_dependency(next_job)
meta.state = RecipeState.COOKED
elif isinstance(next_job, Resource):
eprint(f"have {next_job}")
pass
for edge in dag.edges[next_job]:
logger.debug("updating edge: %s%s", next_job, edge)
if isinstance(edge, Recipe):
rec_meta = dag.recipe_meta[edge]
rec_meta.incoming_uncompleted -= 1
logger.debug("has %d incoming", rec_meta.incoming_uncompleted)
if (
rec_meta.incoming_uncompleted == 0
and rec_meta.state == RecipeState.PENDING
):
rec_meta.state = RecipeState.COOKABLE
self._cookable.put_nowait(edge)
elif isinstance(edge, Resource):
res_meta = dag.resource_meta[edge]
res_meta.incoming_uncompleted -= 1
logger.debug("has %d incoming", res_meta.incoming_uncompleted)
if res_meta.incoming_uncompleted == 0 and not res_meta.completed:
res_meta.completed = True
self._cookable.put_nowait(edge)
# async def run_epoch(
# self,
# epoch: List[DepEle],
# depchecks: Dict[Recipe, DepCheckOutcome],
# concurrency_limit_per_host: int = 5,
# ):
# per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: [])
#
# # sort into per-host lists
# for recipe in epoch:
# if isinstance(recipe, Recipe):
# if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP:
# per_host_lists[recipe.get_host()].append(recipe)
#
# coros: List[Coroutine] = []
#
# for host, recipes in per_host_lists.items():
# host_work_pool = HostWorkPool(recipes, depchecks)
# coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host))
#
# await asyncio.gather(*coros, return_exceptions=False)
async def start(self, utensil: Utensil) -> Channel:
utensil_name = utensil_namer(utensil.__class__)
recipe = current_recipe.get()
cph = await self.get_chanprohead(recipe.get_host(), recipe.get_user(self.head))
context = recipe.recipe_context
cph = await self.get_chanprohead(context.sous, context.user)
# noinspection PyDataclass
payload = cattr.unstructure(utensil)
@ -143,50 +306,51 @@ class Kitchen:
dependency_tracker = self._dependency_trackers.pop(recipe, None)
if not dependency_tracker:
raise KeyError(f"Recipe {recipe} has not been tracked.")
depbook = dependency_tracker.make_depbook()
depbook = dependency_tracker.book
if depbook:
await self._dependency_store.register(recipe, depbook)
@attr.s(auto_attribs=True)
class HostWorkPool:
jobs: List[Recipe]
depchecks: Dict[Recipe, DepCheckOutcome]
next_job: int = 0
async def cook_all(self, kitchen: Kitchen, concurrency_limit: int):
num_jobs = len(self.jobs)
concurrency_limit = min(num_jobs, concurrency_limit)
async def cooker():
while self.next_job < num_jobs:
recipe = self.jobs[self.next_job]
self.next_job += 1
current_recipe.set(recipe)
depcheck = self.depchecks.get(recipe)
if (
depcheck is not None
and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC
):
book = depcheck.book
assert book is not None
can_skip = await kitchen.ut1(
CanSkipDynamic(book.dyn_sous_file_hashes)
)
if can_skip:
continue
await recipe.cook(kitchen)
# if successful, store dependencies
await kitchen._store_dependency(recipe)
nps = kitchen._notifying_provides.get(recipe, None)
if nps:
for depspec in nps:
if depspec not in kitchen.notifications:
# default to changed if not told otherwise
kitchen.notifications[depspec] = True
await asyncio.gather(
*[asyncio.create_task(cooker()) for _ in range(concurrency_limit)]
)
#
# @attr.s(auto_attribs=True)
# class HostWorkPool:
# jobs: List[Recipe]
# depchecks: Dict[Recipe, DepCheckOutcome]
# next_job: int = 0
#
# async def cook_all(self, kitchen: Kitchen, concurrency_limit: int):
# num_jobs = len(self.jobs)
# concurrency_limit = min(num_jobs, concurrency_limit)
#
# async def cooker():
# while self.next_job < num_jobs:
# recipe = self.jobs[self.next_job]
# self.next_job += 1
#
# current_recipe.set(recipe)
# depcheck = self.depchecks.get(recipe)
# if (
# depcheck is not None
# and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC
# ):
# book = depcheck.book
# assert book is not None
# can_skip = await kitchen.ut1(
# CanSkipDynamic(book.dyn_sous_file_hashes)
# )
# if can_skip:
# continue
#
# await recipe.cook(kitchen)
# # if successful, store dependencies
# await kitchen._store_dependency(recipe)
# nps = kitchen._notifying_provides.get(recipe, None)
# if nps:
# for depspec in nps:
# if depspec not in kitchen.notifications:
# # default to changed if not told otherwise
# kitchen.notifications[depspec] = True
#
# await asyncio.gather(
# *[asyncio.create_task(cooker()) for _ in range(concurrency_limit)]
# )

View File

@ -1,125 +1,469 @@
import logging
import os
from os import path
import typing
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import toml
import attr
import textx
from scone.head.dag import RecipeDag, Resource
from scone.head.recipe import RecipeContext
if typing.TYPE_CHECKING:
from scone.head.head import Head
from scone.head.recipe import Recipe
from scone.head.variables import Variables
class Menu:
def __init__(self):
self.hostmenus = {}
def get_host(self, name: str):
if name in self.hostmenus:
return self.hostmenus[name]
else:
new = HostMenu()
self.hostmenus[name] = new
return new
def __str__(self):
lines = ["Menu"]
for hostspec, hostmenu in self.hostmenus.items():
lines.append(f" on {hostspec} :-")
lines += [" " + line for line in str(hostmenu).split("\n")]
lines.append("")
return "\n".join(lines)
def _load_grammar():
grammar_file_path = Path(Path(__file__).parent, "grammar", "scoml.tx")
return textx.metamodel_from_file(grammar_file_path)
class HostMenu:
def __init__(self):
self.dishes = {}
scoml_grammar = _load_grammar()
scoml_classes = scoml_grammar.namespaces["scoml"]
def __str__(self):
lines = ["Menu"]
for recipe, dishes in self.dishes.items():
lines.append(f"- recipe {recipe}")
lines += [f" - {slug} {args}" for slug, args in dishes.items()]
lines.append("")
return "\n".join(lines)
logger = logging.getLogger(__name__)
def parse_toml_menu_descriptor(
filename: str, menu: Menu, default_hostspec: str, source_name: str = None
) -> None:
source_name = source_name or filename
@attr.s(auto_attribs=True)
class ForDirective:
"""
For loop_variable in collection
"""
with open(filename, "r") as f:
menu_desc: Dict[str, Any] = toml.load(f) # type: ignore
# The name of the variable that should be taken on by the iteration
loop_variable: str
if "-----" in menu_desc:
magic_tweaks = menu_desc["-----"]
del menu_desc["-----"]
# List of literals or str for a variable (by name)
collection: Union[str, List[Any]]
@attr.s(auto_attribs=True)
class RecipeEdgeDirective:
# "after" or "before"
kind: str
recipe_id: str
@attr.s(auto_attribs=True)
class ResourceEdgeDirective:
# "needs", "wants" or "provides"
kind: str
resource: Resource
@attr.s(auto_attribs=True)
class MenuBlock:
id: Optional[None]
human: str
contents: List[Union["MenuBlock", "MenuRecipe"]]
parent: Optional["MenuBlock"]
user_directive: Optional[str] = None
sous_directive: Optional[str] = None
for_directives: List[ForDirective] = []
import_directives: List[str] = []
recipe_edges: List[RecipeEdgeDirective] = []
resource_edges: List[ResourceEdgeDirective] = []
@attr.s(auto_attribs=True, eq=False)
class MenuRecipe:
kind: str
id: Optional[str]
human: str
arguments: Dict[str, Any]
parent: MenuBlock
user_directive: Optional[str] = None
sous_directive: Optional[str] = None
for_directives: List[ForDirective] = []
recipe_edges: List[RecipeEdgeDirective] = []
resource_edges: List[ResourceEdgeDirective] = []
def convert_textx_value(txvalue) -> Any:
if isinstance(txvalue, scoml_classes["NaturalList"]):
return [convert_textx_value(element) for element in txvalue.elements]
elif (
isinstance(txvalue, scoml_classes["QuotedString"])
or isinstance(txvalue, scoml_classes["UnquotedString"])
or isinstance(txvalue, scoml_classes["Integer"])
or isinstance(txvalue, scoml_classes["Boolean"])
):
return txvalue.value
elif isinstance(txvalue, scoml_classes["BracketList"]):
return [convert_textx_value(item) for item in txvalue.items]
elif isinstance(txvalue, scoml_classes["BraceDict"]):
result = dict()
for pair in txvalue.pairs:
result[convert_textx_value(pair.key)] = convert_textx_value(pair.value)
else:
magic_tweaks = {}
raise ValueError(f"Unknown SCOML value: {txvalue}")
for key, dishes in menu_desc.items():
# print(key, "=", dishes)
key_parts = key.split("--")
lkp = len(key_parts)
if lkp == 1:
# pg-db.synapse
hostspec = default_hostspec
recipe = key_parts[0]
elif lkp == 2:
if key_parts[1] == "":
# fridge-copy--
hostspec = default_hostspec
recipe = key_parts[0]
def convert_textx_recipe(txrecipe_or_subblock, parent: Optional[MenuBlock]):
if isinstance(txrecipe_or_subblock, scoml_classes["SubBlock"]):
txsubblock = txrecipe_or_subblock
menu_block = convert_textx_block(txsubblock.block, parent)
menu_block.id = txsubblock.unique_id
menu_block.human = txsubblock.human.strip()
return menu_block
elif isinstance(txrecipe_or_subblock, scoml_classes["Recipe"]):
assert parent is not None
txrecipe = txrecipe_or_subblock
args = dict()
for arg in txrecipe.args:
args[arg.name] = convert_textx_value(arg.value)
recipe = MenuRecipe(
txrecipe.kind, txrecipe.unique_id, txrecipe.human.strip(), args, parent
)
for directive in txrecipe.directives:
if isinstance(directive, scoml_classes["UserDirective"]):
recipe.user_directive = directive.user
elif isinstance(directive, scoml_classes["SousDirective"]):
recipe.user_directive = directive.sous
else:
# server1--pg-db.synapse
hostspec = key_parts[0]
recipe = key_parts[1]
elif lkp == 3 and key_parts[2] == "":
# server2--fridge-copy--
hostspec = key_parts[0]
recipe = key_parts[1]
raise ValueError(f"Unknown directive {directive}")
return recipe
else:
raise ValueError("Neither Recipe nor SubBlock: " + str(txrecipe_or_subblock))
def convert_textx_resource(txresource) -> Resource:
extra_params = None
if txresource.extra_params is not None:
extra_params = convert_textx_value(txresource.extra_params)
sous: Optional[str] = "(self)" # XXX docstring to warn about this
if txresource.sous:
if txresource.sous == "head":
sous = None
else:
raise ValueError(f"Don't understand key: {key}")
sous = txresource.sous
hostmenu = menu.get_host(hostspec)
if recipe in hostmenu.dishes:
mdishes = hostmenu.dishes[recipe]
return Resource(txresource.type, txresource.primary, sous, extra_params)
def convert_textx_block(txblock, parent: Optional[MenuBlock]) -> MenuBlock:
recipes: List[Union[MenuRecipe, MenuBlock]] = []
block = MenuBlock(None, "", recipes, parent)
for recipe in txblock.recipes:
recipes.append(convert_textx_recipe(recipe, block))
for directive in txblock.directives:
if isinstance(directive, scoml_classes["UserDirective"]):
# TODO(expectation): error if multiple user directives
block.user_directive = directive.user
elif isinstance(directive, scoml_classes["SousDirective"]):
block.sous_directive = directive.sous
elif isinstance(directive, scoml_classes["ForDirective"]):
block.for_directives.append(
ForDirective(
directive.loop_variable,
directive.collection or convert_textx_value(directive.list),
)
)
elif isinstance(directive, scoml_classes["ImportDirective"]):
block.import_directives.append(directive.importee)
elif isinstance(directive, scoml_classes["ResourceEdgeDirective"]):
block.resource_edges.append(
ResourceEdgeDirective(
directive.kind, convert_textx_resource(directive.resource)
)
)
elif isinstance(directive, scoml_classes["RecipeEdgeDirective"]):
block.recipe_edges.append(RecipeEdgeDirective(directive.kind, directive.id))
else:
mdishes = {}
hostmenu.dishes[recipe] = mdishes
raise ValueError(f"Unknown directive {directive}")
if isinstance(dishes, dict):
for slug, args in dishes.items():
if slug in mdishes:
raise ValueError(
f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}"
)
mdishes[slug] = args
args[".source"] = (source_name, key, slug)
args[".m"] = magic_tweaks
elif isinstance(dishes, list):
for idx, args in enumerate(dishes):
slug = f"@{source_name}@{idx}"
if slug in mdishes:
raise ValueError(
f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}"
)
mdishes[slug] = args
args[".source"] = (source_name, key, idx)
args[".m"] = magic_tweaks
return block
def parse_toml_menu_descriptors(menu_dir: str) -> Menu:
menu = Menu()
for root, dirs, files in os.walk(menu_dir):
for file in files:
full_path = path.join(root, file)
if file.endswith(".toml"):
SousName = str
ForLoopIndices = Tuple[int, ...]
SingleRecipeInvocationKey = Tuple[SousName, ForLoopIndices]
class MenuLoader:
def __init__(self, menu_dir: Path, head: "Head"):
self._menu_dir: Path = menu_dir
self._units: Dict[str, MenuBlock] = dict()
self._recipes: Dict[
MenuRecipe, Dict[SingleRecipeInvocationKey, Recipe]
] = defaultdict(dict)
self._dag: RecipeDag = head.dag
self._head = head
@staticmethod
def _load_menu_unit(full_path: Path, relative: str) -> MenuBlock:
model = scoml_grammar.model_from_file(full_path)
return convert_textx_block(model, None)
def load(self, unit_name: str):
if unit_name in self._units:
return
full_path = Path(self._menu_dir, unit_name + ".scoml")
menu_block = self._load_menu_unit(full_path, unit_name)
self._units[unit_name] = menu_block
for unit in menu_block.import_directives:
self.load(unit)
def resolve_ref(
self, referrer: Union[MenuBlock, MenuRecipe], reference: str
) -> Optional[Union[MenuBlock, MenuRecipe]]:
"""
Resolves a recipe or block reference
:param referrer: recipe or block making the reference that needs to be resolved
:param reference: string reference that needs to be resolved
:return: If found, the menu block or recipe that was referenced.
"""
# TODO(feature): need to think about scoping rules and then figure
# this one out
return None
def _get_first_common_ancestor(
self, one: Union[MenuBlock, MenuRecipe], other: Union[MenuBlock, MenuRecipe]
) -> Optional[MenuBlock]:
ancestors_of_a = set()
a: Optional[Union[MenuBlock, MenuRecipe]] = one
b: Optional[Union[MenuBlock, MenuRecipe]] = other
while a:
ancestors_of_a.add(a)
a = a.parent
while b:
if b in ancestors_of_a:
assert isinstance(b, MenuBlock)
return b
b = b.parent
return None
def get_related_instances(
self,
sous: str,
referrer_indices: Tuple[int, ...],
referrer: Union[MenuBlock, MenuRecipe],
menu_recipe: MenuRecipe,
) -> List["Recipe"]:
result = []
first_common_ancestor = self._get_first_common_ancestor(referrer, menu_recipe)
a: Union[MenuBlock, MenuRecipe] = referrer
strip = 0
while a != first_common_ancestor:
strip += len(a.for_directives)
parent = a.parent
assert parent is not None
a = parent
a = menu_recipe
extra = 0
while a != first_common_ancestor:
extra += len(a.for_directives)
parent = a.parent
assert parent is not None
a = parent
for (instance_sous, indices), recipe in self._recipes[menu_recipe].items():
if sous != instance_sous:
continue
if len(referrer_indices) - strip + extra == len(indices):
if referrer_indices[:-strip] == indices[:-extra]:
result.append(recipe)
else:
logger.warning(
"Mismatch in indices length %r - %d + %d ~/~ %r",
referrer_indices,
strip,
extra,
indices,
)
return result
def dagify_recipe(
self,
recipe: MenuRecipe,
hierarchical_source: str,
fors: Tuple[ForDirective, ...],
applicable_souss: Iterable[str],
applicable_user: Optional[str],
):
recipe_class = self._head.recipe_loader.get_class(recipe.kind)
fors = fors + tuple(recipe.for_directives)
if recipe.user_directive:
applicable_user = recipe.user_directive
if recipe.sous_directive:
applicable_souss = self._head.get_souss_for_hostspec(recipe.sous_directive)
for sous in applicable_souss:
if not applicable_user:
applicable_user = self._head.souss[sous]["user"]
assert applicable_user is not None
sous_vars = self._head.variables[sous]
for _vars, for_indices in self._for_apply(fors, sous_vars, tuple()):
context = RecipeContext(
sous=sous,
user=applicable_user,
slug=recipe.id,
hierarchical_source=hierarchical_source, # XXX
human=recipe.human,
)
args = recipe.arguments # noqa
# XXX sub in vars
instance: Recipe = recipe_class.new(
context, recipe.arguments, self._head
)
self._recipes[recipe][(sous, for_indices)] = instance
self._dag.add(instance)
def dagify_block(
self,
block: MenuBlock,
hierarchical_source: str,
fors: Tuple[ForDirective, ...],
applicable_souss: Iterable[str],
applicable_user: Optional[str],
):
fors = fors + tuple(block.for_directives)
if block.user_directive:
applicable_user = block.user_directive
if block.sous_directive:
applicable_souss = self._head.get_souss_for_hostspec(block.sous_directive)
for content in block.contents:
if isinstance(content, MenuBlock):
block_name = content.id or "?"
self.dagify_block(
content,
f"{hierarchical_source}.{block_name}",
fors,
applicable_souss,
applicable_user,
)
elif isinstance(content, MenuRecipe):
self.dagify_recipe(
content,
hierarchical_source,
fors,
applicable_souss,
applicable_user,
)
else:
raise ValueError(f"{content}?")
def postdagify_recipe(
self,
recipe: MenuRecipe,
fors: Tuple[ForDirective, ...],
applicable_souss: Iterable[str],
):
# add fors
fors = fors + tuple(recipe.for_directives)
if recipe.sous_directive:
applicable_souss = self._head.get_souss_for_hostspec(recipe.sous_directive)
for sous in applicable_souss:
sous_vars = self._head.variables[sous]
for _vars, for_indices in self._for_apply(fors, sous_vars, tuple()):
instance = self._recipes[recipe][(sous, for_indices)] # noqa
# XXX apply specific edges here including those from parent
def postdagify_block(
self,
block: MenuBlock,
fors: Tuple[ForDirective, ...],
applicable_souss: Iterable[str],
):
# XXX pass down specific edges here
fors = fors + tuple(block.for_directives)
if block.sous_directive:
applicable_souss = self._head.get_souss_for_hostspec(block.sous_directive)
for content in block.contents:
if isinstance(content, MenuBlock):
self.postdagify_block(content, fors, applicable_souss)
elif isinstance(content, MenuRecipe):
self.postdagify_recipe(content, fors, applicable_souss)
else:
raise ValueError(f"{content}?")
def dagify_all(self):
for name, unit in self._units.items():
self.dagify_block(
unit, name, tuple(), self._head.get_souss_for_hostspec("all"), None
)
for _name, unit in self._units.items():
self.postdagify_block(
unit, tuple(), self._head.get_souss_for_hostspec("all")
)
def _for_apply(
self, fors: Tuple[ForDirective, ...], vars: "Variables", accum: Tuple[int, ...]
) -> Iterable[Tuple["Variables", Tuple[int, ...]]]:
if not fors:
yield vars, accum
return
head = fors[0]
tail = fors[1:]
to_iter = head.collection
if isinstance(to_iter, str):
to_iter = vars.get_dotted(to_iter)
if not isinstance(to_iter, list):
raise ValueError(f"to_iter = {to_iter!r} not a list")
for idx, item in enumerate(to_iter):
new_vars = Variables(vars)
new_vars.set_dotted(head.loop_variable, item)
yield from self._for_apply(tail, new_vars, accum + (idx,))
def load_menus_in_dir(self) -> RecipeDag:
dag = RecipeDag()
for root, dirs, files in os.walk(self._menu_dir):
for file in files:
if not file.endswith(".scoml"):
continue
# full_path = Path(root, file)
# load this as a menu file
pieces = file.split(".")
default_hostspec = pieces[-2]
relative = str(Path(full_path).relative_to(menu_dir))
parse_toml_menu_descriptor(full_path, menu, default_hostspec, relative)
assert len(pieces) == 2
self.load(pieces[0])
return menu
return dag

View File

@ -1,100 +1,11 @@
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, Optional
import toposort
import attr
if typing.TYPE_CHECKING:
from scone.head import Head
from scone.head.kitchen import Kitchen
DependencySpec = Tuple[str, str, frozenset]
DepEle = Union["Recipe", DependencySpec]
class Preparation:
"""
Preparation on a single host.
This is done before we start deduplicating (could this be improved?).
"""
def __init__(self, recipes: List["Recipe"]):
self._to_process = recipes.copy()
self.recipes = recipes
self._dependencies: Dict[DepEle, Set[DepEle]] = {}
self._recipe_now: Optional[Recipe] = None
self._hard_needs: Set[DependencySpec] = set()
self.notifying_provides: Dict[Recipe, List[DependencySpec]] = defaultdict(
lambda: []
)
def _make_depspec_tuple(
self, requirement: str, identifier: str, **extra_identifiers: Any
) -> DependencySpec:
if "host" not in extra_identifiers:
assert self._recipe_now is not None
extra_identifiers["host"] = self._recipe_now.get_host()
return requirement, identifier, frozenset(extra_identifiers.items())
def needs(
self,
requirement: str,
identifier: str,
hard: bool = False,
**extra_identifiers: Any,
) -> None:
assert self._recipe_now is not None
if self._recipe_now not in self._dependencies:
self._dependencies[self._recipe_now] = set()
depspec_tuple = self._make_depspec_tuple(
requirement, identifier, **extra_identifiers
)
self._dependencies[self._recipe_now].add(depspec_tuple)
if hard:
self._hard_needs.add(depspec_tuple)
def provides(
self,
requirement: str,
identifier: str,
notifying: bool = False,
**extra_identifiers: Any,
) -> None:
assert self._recipe_now is not None
depspec_tuple = self._make_depspec_tuple(
requirement, identifier, **extra_identifiers
)
if depspec_tuple not in self._dependencies:
self._dependencies[depspec_tuple] = set()
self._dependencies[depspec_tuple].add(self._recipe_now)
if notifying:
self.notifying_provides[self._recipe_now].append(depspec_tuple)
def subrecipe(self, recipe: "Recipe"):
assert self._recipe_now is not None
self._to_process.append(recipe)
self.recipes.append(recipe)
args = getattr(recipe, "_args")
if ".source" not in args:
file, key, slug = getattr(self._recipe_now, "_args")[".source"]
args[".source"] = (file, key + "-sub", slug)
def prepare(self, head: "Head") -> List[List]:
while self._to_process:
next_recipe = self._to_process.pop()
self._recipe_now = next_recipe
next_recipe.prepare(self, head)
for hard_need in self._hard_needs:
if hard_need not in self._dependencies:
raise ValueError(f"Hard need not satisfied (no entry): {hard_need}")
if not self._dependencies[hard_need]:
raise ValueError(f"Hard need not satisfied (empty): {hard_need}")
self._dependencies[self._make_depspec_tuple(".internal", "completed")] = set(
self.recipes
)
return list(toposort.toposort(self._dependencies))
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]:
@ -103,60 +14,36 @@ def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]:
return None
@attr.s(auto_attribs=True)
class RecipeContext:
sous: str
user: str
slug: Optional[str]
hierarchical_source: Optional[str]
human: str
class Recipe:
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
self._host = host
self._slug = slug
self._args = args
def get_host(self):
return self._host
def get_tweak(self, name: str, default: Any) -> Any:
dotname = f".{name}"
if dotname in self._args:
return self._args[dotname]
elif ".m" in self._args and name in self._args[".m"]:
return self._args[".m"][name]
else:
return default
def get_user(self, head: "Head") -> str:
user = self.get_tweak("user", head.souss[self._host]["user"])
assert isinstance(user, str)
return user
def __init__(
self, recipe_context: RecipeContext, args: Dict[str, Any], head: "Head"
):
self.recipe_context = recipe_context
self.arguments = args
@classmethod
def from_menu(cls, host: str, slug: str, args: dict, head: "Head") -> "Recipe":
return cls(host, slug, args, head)
def new(cls, recipe_context: RecipeContext, args: Dict[str, Any], head: "Head"):
return cls(recipe_context, args, head)
def prepare(self, preparation: Preparation, head: "Head") -> None:
preparation.needs("os-user", self.get_user(head))
def prepare(self, preparation: "Preparation", head: "Head") -> None:
preparation.needs("os-user", self.recipe_context.user)
# TODO(feature) allow merging per-task and per-menu tweaks
# TODO(feature) allow need/provide custom things, not just user-units
afters = self.get_tweak("needs", None)
if afters:
for after in afters:
if isinstance(after, list) and len(after) == 2:
# allow requesting custom needs
preparation.needs(after[0], after[1])
continue
if not isinstance(after, str):
raise ValueError("needs tweak should be list of strings or pairs.")
preparation.needs("user-unit", after)
befores = self.get_tweak("provides", None)
if befores:
if isinstance(befores, str):
preparation.provides("user-unit", befores)
else:
for before in befores:
if not isinstance(before, str):
raise ValueError("provides tweak should be list of strings.")
preparation.provides("user-unit", before)
async def cook(self, kitchen: "Kitchen") -> None:
raise NotImplementedError
@ -164,8 +51,12 @@ class Recipe:
cls = self.__class__
if hasattr(cls, "RECIPE_NAME"):
return (
f"{cls.RECIPE_NAME}({cls.__name__}) {self._slug} " # type: ignore
f"on {self._host} ({self._args})"
f"{cls.RECIPE_NAME}({cls.__name__})" # type: ignore
f" {self.recipe_context.human} "
f"on {self.recipe_context.sous} ({self.arguments})"
)
else:
return f"{cls.__name__} {self._slug} on {self._host} ({self._args})"
return (
f"{cls.__name__} {self.recipe_context.human}"
f" on {self.recipe_context.sous} ({self.arguments})"
)

View File

@ -27,7 +27,7 @@ async def open_ssh_sous(
client_key: Optional[str],
requested_user: str,
sous_command: str,
debug_logging: bool = False
debug_logging: bool = False,
) -> Tuple[ChanPro, Channel]:
if client_key:
opts = SSHClientConnectionOptions(username=user, client_keys=[client_key])
@ -42,8 +42,11 @@ async def open_ssh_sous(
command = sous_command
if debug_logging:
command = f"tee /tmp/sconnyin-{requested_user} | {command} 2>/tmp/sconnyerr-{requested_user} " \
f"| tee /tmp/sconnyout-{requested_user}"
command = (
f"tee /tmp/sconnyin-{requested_user} "
f"| {command} 2>/tmp/sconnyerr-{requested_user} "
f"| tee /tmp/sconnyout-{requested_user}"
)
process: SSHClientProcess = await conn.create_process(command, encoding=None)

View File

@ -1,5 +1,5 @@
from enum import Enum
from typing import Any, Dict, List, NamedTuple
from typing import Any, Dict, List, NamedTuple, Optional
ExpressionPart = NamedTuple("ExpressionPart", [("kind", str), ("value", str)])
@ -84,8 +84,9 @@ def merge_right_into_left_inplace(left: dict, right: dict):
class Variables:
def __init__(self):
def __init__(self, delegate: Optional["Variables"]):
self._vars: Dict[str, Any] = {}
self._delegate: Optional[Variables] = delegate
def get_dotted(self, name: str) -> Any:
current = self._vars
@ -95,6 +96,8 @@ class Variables:
current = current[k]
return current
except KeyError:
if self._delegate:
return self._delegate.get_dotted(name)
raise KeyError("No variable: " + name)
def has_dotted(self, name: str) -> bool:
@ -102,6 +105,8 @@ class Variables:
self.get_dotted(name)
return True
except KeyError:
if self._delegate:
return self._delegate.has_dotted(name)
return False
def set_dotted(self, name: str, value: Any):

View File

@ -27,7 +27,8 @@ REQUIRED = [
"toml~=0.10.1",
"attrs~=19.3.0",
"cattrs~=1.0.0",
"canonicaljson~=1.2.0"
"canonicaljson~=1.2.0",
"immutabledict==1.0.0"
]