Initial commit before I start messing around

This commit is contained in:
Olivier 'reivilibre' 2020-09-24 18:53:31 +01:00
commit 328c99924c
49 changed files with 3924 additions and 0 deletions

23
mypy.ini Normal file
View File

@ -0,0 +1,23 @@
[mypy]
check_untyped_defs = True
[mypy-cbor2]
ignore_missing_imports = True
[mypy-toposort]
ignore_missing_imports = True
[mypy-asyncssh]
ignore_missing_imports = True
[mypy-nacl.*]
ignore_missing_imports = True
[mypy-secretstorage]
ignore_missing_imports = True
[mypy-canonicaljson]
ignore_missing_imports = True
[mypy-asyncpg]
ignore_missing_imports = True

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
-e .

0
scone/__init__.py Normal file
View File

55
scone/__main__.py Normal file
View File

@ -0,0 +1,55 @@
# import asyncio
# import itertools
# import sys
# from typing import List
#
# from scone.head import Head, Recipe
# from scone.head.kitchen import Kitchen
# from scone.head.recipe import Preparation
# def main(args=None):
# if args is None:
# args = sys.argv[1:]
#
# if len(args) < 1:
# raise RuntimeError("Needs to be passed a sous config directory as 1st arg!")
#
# print("Am I a head?")
#
# head = Head.open(args[0])
#
# print(head.debug_info())
#
# recipes_by_sous = head.construct_recipes()
#
# all_recipes: List[Recipe] = list(
# itertools.chain.from_iterable(recipes_by_sous.values())
# )
#
# prepare = Preparation(all_recipes)
# order = prepare.prepare(head)
#
# for epoch, items in enumerate(order):
# print(f"----- Course {epoch} -----")
#
# for item in items:
# if isinstance(item, Recipe):
# print(f" > recipe {item}")
# elif isinstance(item, tuple):
# kind, ident, extra = item
# print(f" - we now have {kind} {ident} {dict(extra)}")
#
# print("Starting run")
#
# k = Kitchen(head)
#
# async def cook():
# for epoch, epoch_items in enumerate(order):
# print(f"Cooking Course {epoch} of {len(order)}")
# await k.run_epoch(epoch_items)
#
# asyncio.get_event_loop().run_until_complete(cook())
#
#
# if __name__ == "__main__":
# main()

0
scone/common/__init__.py Normal file
View File

172
scone/common/chanpro.py Normal file
View File

@ -0,0 +1,172 @@
import asyncio
import logging
import struct
import sys
from asyncio import Queue, Task
from asyncio.streams import FlowControlMixin, StreamReader, StreamWriter
from typing import Any, Dict, Optional
import attr
import cattr
import cbor2
SIZE_FORMAT = "!I"
logger = logging.getLogger(__name__)
class ChanPro:
def __init__(self, in_stream: StreamReader, out_stream: StreamWriter):
self._in = in_stream
self._out = out_stream
self._channels: Dict[int, "Channel"] = {}
self._listener: Optional[Task] = None
async def close(self) -> None:
# TODO cancel _listener?
pass
@staticmethod
async def open_from_stdio() -> "ChanPro":
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer)
writer_transport, writer_protocol = await loop.connect_write_pipe(
FlowControlMixin, sys.stdout.buffer
)
writer = StreamWriter(writer_transport, writer_protocol, None, loop)
return ChanPro(reader, writer)
async def _send_dict(self, dictionary: dict):
encoded = cbor2.dumps(dictionary)
encoded_len = struct.pack(SIZE_FORMAT, len(encoded))
self._out.write(encoded_len)
self._out.write(encoded)
await self._out.drain()
async def _recv_dict(self) -> dict:
size = struct.calcsize(SIZE_FORMAT)
encoded_len = await self._in.readexactly(size)
(length,) = struct.unpack(SIZE_FORMAT, encoded_len)
encoded = await self._in.readexactly(length)
return cbor2.loads(encoded)
def new_channel(self, number: int, desc: str):
if number in self._channels:
channel = self._channels[number]
raise ValueError(f"Channel {number} already in use ({channel}).")
channel = Channel(number, desc, self)
self._channels[number] = channel
return channel
async def send_message(self, channel: int, payload: Any):
await self._send_dict({"c": channel, "p": payload})
async def send_close(self, channel: int, reason: str = None):
# TODO extend with error indication capability (remote throw) ?
# TODO might want to wait until other end closed?
await self._send_dict({"c": channel, "close": True, "reason": reason})
def start_listening_to_channels(self, default_route: Optional["Channel"]):
async def channel_listener():
idx = 0
while True:
message = await self._recv_dict()
logger.debug("<message> %d %r", idx, message)
idx += 1
await self.handle_incoming_message(message, default_route=default_route)
self._listener = asyncio.create_task(
channel_listener() # py 3.8 , name="chanpro channel listener"
)
async def handle_incoming_message(
self, message: dict, default_route: Optional["Channel"] = None
):
if "c" not in message:
logger.warning("Received message without channel number.")
channel_num = message["c"]
channel = self._channels.get(channel_num)
if not channel:
if default_route:
await default_route._queue.put({"lost": message})
else:
logger.warning(
"Received message about non-existent channel number %r.",
channel_num,
)
return
# XXX todo send msg, what about shutdown too?
if "p" in message:
# payload on channel
await channel._queue.put(message["p"])
elif "close" in message:
channel._closed = True
await channel._queue.put(None)
else:
raise ValueError(f"Unknown channel message with keys {message.keys()}")
class Channel:
def __init__(self, number: int, desc: str, chanpro: ChanPro):
self.number = number
self.description = desc
self.chanpro = chanpro
self._queue: Queue[Any] = Queue()
self._closed = False
def __str__(self):
return f"Channel №{self.number} ({self.description})"
async def send(self, payload: Any):
if attr.has(payload.__class__):
payload = cattr.unstructure(payload)
await self.chanpro.send_message(self.number, payload)
async def recv(self) -> Any:
if self._queue.empty() and self._closed:
raise EOFError("Channel closed.")
item = await self._queue.get()
if item is None and self._queue.empty() and self._closed:
raise EOFError("Channel closed.")
return item
async def close(self, reason: str = None):
if not self._closed:
self._closed = True
await self._queue.put(None)
await self.chanpro.send_close(self.number, reason)
async def wait_close(self):
try:
await self.recv()
raise RuntimeError("Message arrived when expecting closure.")
except EOFError:
# expected
return
async def consume(self) -> Any:
"""
Consume the last item of the channel and assert closure.
The last item is returned.
"""
item = await self.recv()
await self.wait_close()
return item
class ChanProHead:
def __init__(self, chanpro: ChanPro, channel0: Channel):
self._chanpro = chanpro
self._channel0 = channel0
self._next_channel_id = 1
async def start_command_channel(self, command: str, payload: Any) -> Channel:
new_channel = self._chanpro.new_channel(self._next_channel_id, command)
self._next_channel_id += 1
await self._channel0.send(
{"nc": new_channel.number, "cmd": command, "pay": payload}
)
return new_channel

45
scone/common/loader.py Normal file
View File

@ -0,0 +1,45 @@
import importlib
import pkgutil
from inspect import isclass
from typing import Any, Callable, Dict, Generic, Optional, TypeVar
T = TypeVar("T")
class ClassLoader(Generic[T]):
def __init__(self, clarse: Any, name_getter: Callable[[Any], Optional[str]]):
self._class = clarse
self._classes: Dict[str, Callable[[str, str, dict], T]] = dict()
self._name_getter = name_getter
def add_package_root(self, module_root: str):
module = importlib.import_module(module_root)
self._add_module(module)
# find subpackages
for mod in pkgutil.iter_modules(module.__path__): # type: ignore
if mod.ispkg:
self.add_package_root(module_root + "." + mod.name)
else:
submodule = importlib.import_module(module_root + "." + mod.name)
self._add_module(submodule)
def _add_module(self, module):
# find recipes
for name in dir(module):
item = getattr(module, name)
if isclass(item) and issubclass(item, self._class):
reg_name = self._name_getter(item)
if reg_name is not None:
self._classes[reg_name] = item
def get_class(self, name: str):
return self._classes.get(name)
def __str__(self) -> str:
lines = ["Generic Loader. Loaded stuff:"]
for recipe_name, recipe_class in self._classes.items():
lines.append(f" - {recipe_name} from {recipe_class.__module__}")
return "\n".join(lines)

42
scone/common/misc.py Normal file
View File

@ -0,0 +1,42 @@
import os
import sys
from hashlib import sha256
def eprint(*args, **kwargs):
kwargs["file"] = sys.stderr
print(*args, **kwargs)
def sha256_dir(path: str) -> str:
items = {}
with os.scandir(path) as scandir:
for dir_entry in scandir:
if dir_entry.is_dir():
items[dir_entry.name] = sha256_dir(dir_entry.path)
else:
items[dir_entry.name] = sha256_file(dir_entry.path)
items_sorted = list(items.items())
items_sorted.sort()
buf = b""
for fname, fhash in items_sorted:
buf += fname.encode()
buf += b"\0"
buf += fhash.encode()
buf += b"\0"
return sha256_bytes(buf)
def sha256_file(path: str) -> str:
hasher = sha256(b"")
with open(path, "rb") as fread:
while True:
data = fread.read(8192 * 1024)
if not data:
break
hasher.update(data)
return hasher.hexdigest()
def sha256_bytes(data: bytes) -> str:
return sha256(data).hexdigest()

62
scone/common/modeutils.py Normal file
View File

@ -0,0 +1,62 @@
import re
from typing import Union
# Opinionated default modes for personal use.
# Security conscious but also reasonable.
DEFAULT_MODE_FILE = 0o660
DEFAULT_MODE_DIR = 0o775
def parse_mode(mode_code: Union[str, int], directory: bool) -> int:
look_up = {"r": 0o4, "w": 0o2, "x": 0o1}
mults = {"u": 0o100, "g": 0o010, "o": 0o001, "a": 0o111}
mode = 0
if isinstance(mode_code, int):
return mode_code
pieces = mode_code.split(",")
for piece in pieces:
piecebits = 0
match = re.fullmatch(
r"(?P<affected>[ugoa]+)(?P<op>[-+=])(?P<value>[rwxXst]*)", piece
)
if match is None:
raise ValueError(f"Did not understand mode string {piece}")
affected = set(match.group("affected"))
op = match.group("op")
values = set(match.group("value"))
if "X" in values:
values.remove("X")
if directory:
values.add("x")
mult = 0
for affectee in affected:
mult |= mults[affectee]
for value in values:
if value in ("r", "w", "x"):
piecebits |= look_up[value] * mult
elif value == "s":
if "u" in affected:
piecebits |= 0o4000
if "g" in affected:
piecebits |= 0o2000
elif value == "t":
piecebits |= 0o1000
if op == "=":
# OR with piecebits allows setting suid, sgid and sticky.
mask = (mult * 0o7) | piecebits
mode &= ~mask
mode |= piecebits
elif op == "+":
mode |= piecebits
elif op == "-":
mode &= ~piecebits
else:
raise RuntimeError("op not [-+=].")
return mode

20
scone/common/pools.py Normal file
View File

@ -0,0 +1,20 @@
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
class Pools:
_instance = None
def __init__(self):
self.threaded = ThreadPoolExecutor()
self.process = ProcessPoolExecutor()
@staticmethod
def get():
if not Pools._instance:
Pools._instance = Pools()
return Pools._instance
def shutdown(self):
self.threaded.shutdown()
self.process.shutdown()

View File

View File

View File

@ -0,0 +1,88 @@
from typing import Dict, List, Set, Tuple
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type
class AptInstallInternal(Recipe):
"""
Actually installs the packages; does it in a single batch for efficiency!
"""
_NAME = "apt-install.internal"
# TODO(extension, low): expand this into apt-install-now if we need
# the flexibility
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
self.packages: Set[str] = set()
args["packages"] = self.packages
args[".source"] = ("@virtual", "apt-install-internal", "the one true AII")
def get_user(self, head: "Head") -> str:
return "root"
def prepare(self, preparation: Preparation, head: "Head") -> None:
super().prepare(preparation, head)
preparation.needs("apt-stage", "internal-install-packages")
preparation.needs("apt-stage", "repositories-declared")
preparation.provides("apt-stage", "packages-installed")
async def cook(self, kitchen: Kitchen) -> None:
# apt-installs built up the args to represent what was needed, so this
# will work as-is
kitchen.get_dependency_tracker()
if self.packages:
update = await kitchen.ut1areq(
SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result
)
if update.exit_code != 0:
raise RuntimeError(
f"apt update failed with err {update.exit_code}: {update.stderr!r}"
)
install_args = ["apt-get", "-yq", "install"]
install_args += list(self.packages)
install = await kitchen.ut1areq(
SimpleExec(install_args, "/"), SimpleExec.Result
)
if install.exit_code != 0:
raise RuntimeError(
f"apt install failed with err {install.exit_code}:"
f" {install.stderr!r}"
)
class AptPackage(Recipe):
_NAME = "apt-install"
internal_installers: Dict[Tuple[Head, str], AptInstallInternal] = {}
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.packages: List[str] = check_type(args["packages"], list)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
pair = (head, self.get_host())
if pair not in AptPackage.internal_installers:
install_internal = AptInstallInternal(self.get_host(), "internal", {}, head)
AptPackage.internal_installers[pair] = install_internal
preparation.subrecipe(install_internal)
preparation.provides("apt-stage", "internal-install-packages")
internal_installer = AptPackage.internal_installers.get(pair)
assert internal_installer is not None
internal_installer.packages.update(self.packages)
async def cook(self, kitchen: Kitchen) -> None:
# can't be tracked
kitchen.get_dependency_tracker().ignore()

View File

@ -0,0 +1,322 @@
from pathlib import Path
from typing import List
from scone.common.modeutils import DEFAULT_MODE_DIR, parse_mode
from scone.default.steps.basic_steps import exec_no_fails
from scone.default.steps.filesystem_steps import depend_remote_file
from scone.default.utensils.basic_utensils import (
Chmod,
Chown,
MakeDirectory,
SimpleExec,
Stat,
)
from scone.default.utensils.dynamic_dependencies import HasChangedInSousStore
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type, check_type_opt
class DeclareFile(Recipe):
"""
Declares that a file already exists on the sous.
Maybe we will assert it in the future?
"""
_NAME = "declare-file"
def prepare(self, preparation: Preparation, head: Head):
preparation.provides("file", self._args["path"])
async def cook(self, kitchen: Kitchen):
# mark as tracked.
kitchen.get_dependency_tracker()
class DeclareDirectory(Recipe):
"""
Declares that a directory already exists on the sous.
Maybe we will assert it in the future?
"""
_NAME = "declare-dir"
def prepare(self, preparation: Preparation, head: Head):
preparation.provides("directory", self._args["path"])
async def cook(self, kitchen: Kitchen):
# mark as tracked.
kitchen.get_dependency_tracker()
class EnsureDirectory(Recipe):
"""
Makes a directory tree.
"""
_NAME = "directory"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
parents = args.get("parents", 0)
assert isinstance(parents, int)
path = args.get("path")
assert isinstance(path, str)
mode = args.get("mode", DEFAULT_MODE_DIR)
assert isinstance(mode, str) or isinstance(mode, int)
self.path = path
self.parents = parents
self.mode = parse_mode(mode, directory=True)
self._make: List[str] = []
self.targ_user = args.get("owner", self.get_user(head))
self.targ_group = args.get("group", self.targ_user)
def prepare(self, preparation: Preparation, head: "Head"):
super().prepare(preparation, head)
preparation.needs("os-user", self.targ_user)
preparation.needs("os-group", self.targ_group)
preparation.provides("directory", self.path)
self._make.append(self.path)
parent = Path(self.path).parent
for _ in range(self.parents):
self._make.append(str(parent))
preparation.provides("directory", str(parent))
parent = parent.parent
preparation.needs("directory", str(parent))
self._make.reverse()
async def cook(self, k: Kitchen):
for directory in self._make:
stat = await k.ut1a(Stat(directory), Stat.Result)
if stat is None:
# doesn't exist, make it
await k.ut0(MakeDirectory(directory, self.mode))
stat = await k.ut1a(Stat(directory), Stat.Result)
if stat is None:
raise RuntimeError("Directory vanished after creation!")
if stat.dir:
if (stat.user, stat.group) != (self.targ_user, self.targ_group):
# need to chown
await k.ut0(Chown(directory, self.targ_user, self.targ_group))
if stat.mode != self.mode:
await k.ut0(Chmod(directory, self.mode))
else:
raise RuntimeError("Already exists but not a dir: " + directory)
# mark as tracked.
k.get_dependency_tracker()
class ExtractTar(Recipe):
"""
Extracts a tar archive, expecting to get at least some files.
"""
_NAME = "tar-extract"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
self.tar = check_type(args.get("tar"), str)
self.dir = check_type(args.get("dir"), str)
self.expect_files = check_type(args.get("expects_files"), List[str])
def prepare(self, preparation: Preparation, head: "Head"):
super().prepare(preparation, head)
preparation.needs("file", self.tar)
preparation.needs("directory", self.dir)
for file in self.expect_files:
assert isinstance(file, str)
final = str(Path(self.dir, file))
preparation.provides("file", final)
async def cook(self, k: "Kitchen"):
res = await k.ut1areq(
SimpleExec(["tar", "xf", self.tar], self.dir), SimpleExec.Result
)
if res.exit_code != 0:
raise RuntimeError(
f"tar failed with ec {res.exit_code}; stderr = <<<"
f"\n{res.stderr.decode()}\n>>>"
)
for expect_relative in self.expect_files:
expect = str(Path(self.dir, expect_relative))
stat = await k.ut1a(Stat(expect), Stat.Result)
if stat is None:
raise RuntimeError(
f"tar succeeded but expectation failed; {expect!r} not found."
)
class RunScript(Recipe):
"""
Runs a script (such as an installation script).
"""
_NAME = "script-run"
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
self.working_dir = check_type(args.get("working_dir"), str)
# relative to working dir
self.script = check_type(args.get("script"), str)
# todo other remote dependencies
# todo provided files as a result of the script exec
def prepare(self, preparation: Preparation, head: "Head"):
super().prepare(preparation, head)
final_script = str(Path(self.working_dir, self.script))
preparation.needs("file", final_script)
# TODO more needs
# TODO preparation.provides()
async def cook(self, kitchen: "Kitchen"):
final_script = str(Path(self.working_dir, self.script))
await depend_remote_file(final_script, kitchen)
class CommandOnChange(Recipe):
"""
Runs a command when at least one file listed has changed on the remote.
"""
_NAME = "command-on-change"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.purpose = check_type(args.get("purpose"), str)
self.command = check_type(args.get("command"), list)
self.watching = check_type(args.get("files"), list)
self.working_dir = check_type(args.get("working_dir", "/"), str)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
for file in self.watching:
preparation.needs("file", file)
async def cook(self, kitchen: Kitchen) -> None:
kitchen.get_dependency_tracker().ignore()
changed = await kitchen.ut1(HasChangedInSousStore(self.purpose, self.watching))
if changed:
result = await kitchen.ut1areq(
SimpleExec(self.command, self.working_dir), SimpleExec.Result
)
if result.exit_code != 0:
raise RuntimeError(
f"exit code not 0 ({result.exit_code}), {result.stderr!r}"
)
class GitCheckout(Recipe):
_NAME = "git"
# TODO(correctness): branches can change (tags too), but this will still
# declare SAFE_TO_SKIP. Perhaps we want to stop that unless you opt out?
# But oh well for now.
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.repo_src = check_type(args.get("src"), str)
self.dest_dir = check_type(args.get("dest"), str)
self.ref = check_type_opt(args.get("ref"), str)
self.branch = check_type_opt(args.get("branch"), str)
if not (self.ref or self.branch):
raise ValueError("Need to specify 'ref' or 'branch'")
if self.ref and self.branch:
raise ValueError("Can't specify both 'ref' and 'branch'.")
# should end with / if it's a dir
self.expect: List[str] = check_type(args.get("expect", []), list)
self.submodules = check_type(args.get("submodules", False), bool)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
parent = str(Path(self.dest_dir).parent)
preparation.needs("directory", parent)
preparation.provides("directory", self.dest_dir)
for expected in self.expect:
expected_path_str = str(Path(self.dest_dir, expected))
if expected.endswith("/"):
preparation.provides("directory", expected_path_str)
else:
preparation.provides("file", expected_path_str)
async def cook(self, k: Kitchen) -> None:
# no non-arg dependencies
k.get_dependency_tracker()
stat = await k.ut1a(Stat(self.dest_dir), Stat.Result)
if stat is None:
# doesn't exist; git init it
await exec_no_fails(
k, ["git", "init", self.dest_dir], "/"
)
stat = await k.ut1a(Stat(self.dest_dir), Stat.Result)
if stat is None:
raise RuntimeError("Directory vanished after creation!")
if not stat.dir:
raise RuntimeError("Already exists but not a dir: " + self.dest_dir)
# add the remote, removing it first to ensure it's what we want
# don't care if removing fails
await k.ut1areq(SimpleExec(["git", "remote", "remove", "scone"], self.dest_dir), SimpleExec.Result)
await exec_no_fails(
k, ["git", "remote", "add", "scone", self.repo_src], self.dest_dir
)
# fetch the latest from the remote
await exec_no_fails(
k, ["git", "fetch", "scone"], self.dest_dir
)
# figure out what ref we want to use
# TODO(performance): fetch only this ref?
ref = self.ref or f"scone/{self.branch}"
# switch to that ref
await exec_no_fails(
k, ["git", "switch", "--detach", ref], self.dest_dir
)
# if we use submodules
if self.submodules:
await exec_no_fails(
k, ["git", "submodule", "update", "--init", "--recursive"], self.dest_dir
)
for expected in self.expect:
expected_path_str = str(Path(self.dest_dir, expected))
# TODO(performance, low): parallelise these
stat = await k.ut1a(Stat(expected_path_str), Stat.Result)
if not stat:
raise RuntimeError(f"expected {expected_path_str} to exist but it did not")
if stat.dir and not expected.endswith("/"):
raise RuntimeError(f"expected {expected_path_str} to exist as a file but it is a dir")
if not stat.dir and expected.endswith("/"):
raise RuntimeError(f"expected {expected_path_str} to exist as a dir but it is a file")

View File

@ -0,0 +1,164 @@
import asyncio
from asyncio import Future
from pathlib import Path
from typing import Dict, cast
from urllib.parse import urlparse
from urllib.request import urlretrieve
from scone.common.misc import sha256_file
from scone.common.modeutils import DEFAULT_MODE_FILE, parse_mode
from scone.default.steps import fridge_steps
from scone.default.steps.fridge_steps import FridgeMetadata, load_and_transform, SUPERMARKET_RELATIVE
from scone.default.utensils.basic_utensils import WriteFile, Chown
from scone.head import Head
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation, Recipe
from scone.head.utils import check_type
class FridgeCopy(Recipe):
"""
Declares that a file should be copied from the head to the sous.
"""
_NAME = "fridge-copy"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
fp = fridge_steps.search_in_fridge(head, args["src"])
if fp is None:
raise ValueError(f"Cannot find {args['src']} in the fridge.")
unextended_path_str, meta = fridge_steps.decode_fridge_extension(str(fp))
unextended_path = Path(unextended_path_str)
dest = args["dest"]
if not isinstance(dest, str):
raise ValueError("No destination provided or wrong type.")
if dest.endswith("/"):
self.destination: Path = Path(args["dest"], unextended_path.parts[-1])
else:
self.destination = Path(args["dest"])
mode = args.get("mode", DEFAULT_MODE_FILE)
assert isinstance(mode, str) or isinstance(mode, int)
self.fridge_path: str = args["src"]
self.real_path: Path = fp
self.fridge_meta: FridgeMetadata = meta
self.mode = parse_mode(mode, directory=False)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
preparation.provides("file", str(self.destination))
preparation.needs("directory", str(self.destination.parent))
async def cook(self, k: Kitchen) -> None:
data = await load_and_transform(
k, self.fridge_meta, self.real_path, self.get_host()
)
dest_str = str(self.destination)
chan = await k.start(WriteFile(dest_str, self.mode))
await chan.send(data)
await chan.send(None)
if await chan.recv() != "OK":
raise RuntimeError(f"WriteFail failed on fridge-copy to {self.destination}")
# this is the wrong thing
# hash_of_data = sha256_bytes(data)
# k.get_dependency_tracker().register_remote_file(dest_str, hash_of_data)
await k.get_dependency_tracker().register_fridge_file(
self.fridge_path, self.real_path
)
class Supermarket(Recipe):
"""
Downloads an asset (cached if necessary) and copies to sous.
"""
_NAME = "supermarket"
# dict of target path → future that will complete when it's downloaded
in_progress: Dict[str, Future] = dict()
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
super().__init__(host, slug, args, head)
self.url = args.get("url")
assert isinstance(self.url, str)
self.sha256 = check_type(args.get("sha256"), str).lower()
dest = args["dest"]
if not isinstance(dest, str):
raise ValueError("No destination provided or wrong type.")
if dest.endswith("/"):
file_basename = urlparse(self.url).path.split("/")[-1]
self.destination: Path = Path(args["dest"], file_basename).resolve()
else:
self.destination = Path(args["dest"]).resolve()
self.owner = check_type(args.get("owner", self.get_user(head)), str)
self.group = check_type(args.get("group", self.owner), str)
mode = args.get("mode", DEFAULT_MODE_FILE)
assert isinstance(mode, str) or isinstance(mode, int)
self.mode = parse_mode(mode, directory=False)
def prepare(self, preparation: Preparation, head: "Head"):
super().prepare(preparation, head)
preparation.provides("file", str(self.destination))
async def cook(self, kitchen: "Kitchen"):
# need to ensure we download only once, even in a race…
supermarket_path = Path(kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256)
if self.sha256 in Supermarket.in_progress:
await Supermarket.in_progress[self.sha256]
elif not supermarket_path.exists():
note = f"""
Scone Supermarket
This file corresponds to {self.url}
Downloaded by {self}
""".strip()
Supermarket.in_progress[self.sha256] = cast(
Future,
asyncio.get_running_loop().run_in_executor(
kitchen.head.pools.threaded,
self._download_file,
self.url,
str(supermarket_path),
self.sha256,
note
),
)
# TODO(perf): load file in another thread
with open(supermarket_path, "r") as fin:
data = fin.read()
chan = await kitchen.start(WriteFile(str(self.destination), self.mode))
await chan.send(data)
await chan.send(None)
if await chan.recv() != "OK":
raise RuntimeError(f"WriteFail failed on supermarket to {self.destination}")
await kitchen.ut0(Chown(str(self.destination), self.owner, self.group))
@staticmethod
def _download_file(url: str, dest_path: str, check_sha256: str, note: str):
urlretrieve(url, dest_path)
real_sha256 = sha256_file(dest_path)
if real_sha256 != check_sha256:
raise RuntimeError(
f"sha256 hash mismatch {real_sha256} != {check_sha256} (wanted)"
)
with open(dest_path + ".txt", "w") as fout:
# leave a note so we can find out what this is if we need to.
fout.write(note)

View File

@ -0,0 +1,64 @@
import crypt
import logging
from typing import Optional
from scone.default.steps import linux_steps
from scone.default.utensils.linux_utensils import GetPasswdEntry
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type, check_type_opt
logger = logging.getLogger(__name__)
class LinuxUser(Recipe):
_NAME = "os-user"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
if slug[0] == "@":
raise ValueError("os-user should be used like [os-user.username].")
self.user_name = slug
self.make_group = check_type(args.get("make_group", True), bool)
self.make_home = check_type(args.get("make_home", True), bool)
self.home: Optional[str] = check_type_opt(args.get("home"), str)
self.password: Optional[str] = check_type_opt(args.get("password"), str)
def prepare(self, preparation: Preparation, head: "Head") -> None:
super().prepare(preparation, head)
preparation.provides("os-user", self.user_name)
if self.make_group:
preparation.provides("os-group", self.user_name)
async def cook(self, kitchen: Kitchen) -> None:
# TODO(documentation): note this does not update users
# acknowledge tracking
kitchen.get_dependency_tracker()
if self.password:
password_hash: Optional[str] = crypt.crypt(self.password)
else:
password_hash = None
pwd_entry = await kitchen.ut1a(
GetPasswdEntry(self.user_name), GetPasswdEntry.Result
)
if pwd_entry:
logger.warning(
"Not updating existing os-user '%s' as it exists already and "
"modifications could be dangerous in any case. Modification "
"support may be implemented in the future.",
self.user_name,
)
else:
# create the user fresh
await linux_steps.create_linux_user(
kitchen,
self.user_name,
password_hash,
self.make_home,
self.make_group,
self.home,
)

View File

@ -0,0 +1,94 @@
from scone.default.utensils.db_utensils import PostgresTransaction
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type
class PostgresDatabase(Recipe):
_NAME = "pg-db"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.database_name = slug
self.owner = check_type(args.get("owner"), str)
self.encoding = args.get("encoding", "utf8")
self.collate = args.get("collate", "en_GB.utf8")
self.ctype = args.get("ctype", "en_GB.utf8")
self.template = args.get("template", "template0")
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
# todo
async def cook(self, kitchen: Kitchen) -> None:
ch = await kitchen.start(PostgresTransaction("postgres"))
await ch.send(
(
"SELECT 1 AS count FROM pg_catalog.pg_database WHERE datname = ?;",
self.database_name,
)
)
dbs = await ch.recv()
if len(dbs) > 0 and dbs[0]["count"] == 1:
await ch.send(None)
await ch.wait_close()
return
q = f"""
CREATE DATABASE {self.database_name}
WITH OWNER {self.owner}
ENCODING {self.encoding}
LC_COLLATE {self.collate}
LC_CTYPE {self.ctype}
TEMPLATE {self.template};
"""
await ch.send((q,))
res = await ch.recv()
if len(res) != 0:
raise RuntimeError("expected empty result set.")
await ch.send(None)
await ch.wait_close()
class PostgresUser(Recipe):
_NAME = "pg-user"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.user_name = slug
self.password = check_type(args.get("password"), str)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
# todo
async def cook(self, kitchen: Kitchen) -> None:
ch = await kitchen.start(PostgresTransaction("postgres"))
await ch.send(
(
"SELECT 1 AS count FROM pg_catalog.pg_user WHERE usename = ?;",
self.user_name,
)
)
dbs = await ch.recv()
if len(dbs) > 0 and dbs[0]["count"] == 1:
await ch.send(None)
await ch.wait_close()
return
q = f"""
CREATE ROLE {self.user_name}
WITH PASSWORD ?
LOGIN;
"""
await ch.send((q, self.password))
res = await ch.recv()
if len(res) != 0:
raise RuntimeError("expected empty result set.")
await ch.send(None)
await ch.wait_close()

View File

@ -0,0 +1,82 @@
from pathlib import Path
from typing import Tuple, List
from scone.default.recipes.apt import AptPackage
from scone.default.steps.basic_steps import exec_no_fails
from scone.default.steps.filesystem_steps import depend_remote_file
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type
class PythonVenv(Recipe):
"""
Creates a Python virtualenv with a specified set of requirements.
Note: using a directory as a dependency can be inefficient as dir SHA256
will be performed to check it has not changed.
"""
_NAME = "python-venv"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.dir = check_type(args.get("dir"), str)
self.interpreter = check_type(args.get("interpreter"), str)
# list of flags. Current supported:
# git (local git repo — track hash by git commit hash), dir, -r
self.install: List[Tuple[str, List[str]]] = []
install_plaintext = check_type(args.get("install"), list)
for install_line in install_plaintext:
parts = install_line.split(" ")
self.install.append((parts[-1], parts[0:-1]))
self.no_apt_install = check_type(args.get("_no_apt_install", False), bool)
# TODO(sdists)
def prepare(self, preparation: Preparation, head: Head):
super().prepare(preparation, head)
preparation.needs("dir", str(Path(self.dir).parent))
for name, flags in self.install:
if "-r" in flags:
preparation.needs("file", name)
elif "git" in flags or "dir" in flags:
preparation.needs("dir", name)
final_script = str(Path(self.dir, "bin/python"))
preparation.provides("file", str(final_script))
if not self.no_apt_install:
preparation.subrecipe(
AptPackage(
self.get_host(), "@venv-apt", {"packages": ["python3-venv"]}, head
)
)
preparation.needs("apt-stage", "packages-installed")
async def cook(self, kitchen: Kitchen):
dt = kitchen.get_dependency_tracker()
await exec_no_fails(
kitchen, [self.interpreter, "-m", "venv", self.dir], "/"
)
install_args = []
for name, flags in self.install:
if "-r" in flags:
install_args.append("-r")
await depend_remote_file(name, kitchen)
elif "dir" in flags or "git" in flags:
# TODO(perf, dedup): custom dynamic dependency types; git
# dependencies and sha256_dir dependencies.
dt.ignore()
install_args.append(name)
await exec_no_fails(
kitchen, [self.dir + "/bin/pip", "install"] + install_args, "/"
)

View File

@ -0,0 +1,114 @@
from typing import Dict
from scone.default.recipes.filesystem import CommandOnChange
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Head, Recipe
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation
from scone.head.utils import check_type, check_type_opt
class SystemdUnit(Recipe):
"""
Shorthand for a system unit. Metarecipe.
"""
_NAME = "systemd"
daemon_reloaders: Dict[str, CommandOnChange] = {}
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.unit_name = slug if "." in slug else slug + ".service"
self.at = check_type(args.get("at"), str)
self.enabled = check_type_opt(args.get("enabled"), bool)
self.restart_on = check_type_opt(args.get("restart_on"), list)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
preparation.provides("systemd-unit", self.unit_name)
preparation.needs("systemd-stage", "daemon-reloaded")
if self.enabled is not None:
enable_recipe = SystemdEnabled(
self.get_host(),
self.unit_name,
{"enabled": self.enabled, "at": self.at, ".user": "root"},
head,
)
preparation.subrecipe(enable_recipe)
preparation.needs("systemd-stage", "enabled")
daemon_reloader = SystemdUnit.daemon_reloaders.get(self.get_host(), None)
if not daemon_reloader:
# TODO this should be replaced with a dedicated command which provides
# those units.
daemon_reloader = CommandOnChange(
self.get_host(),
"systemd-internal",
{
"purpose": "systemd.daemon_reload",
"command": ["systemctl", "daemon-reload"],
"files": [],
".user": "root",
},
head,
)
preparation.subrecipe(daemon_reloader)
file_list = getattr(daemon_reloader, "_args")["files"]
file_list.append(self.at)
if self.restart_on:
service_reloader = CommandOnChange(
self.get_host(),
"systemd-internal",
{
"purpose": "systemd.unit_reload",
"command": ["systemctl", "reload", self.unit_name],
"files": self.restart_on + [self.at],
".user": "root",
},
head,
)
preparation.subrecipe(service_reloader)
async def cook(self, kitchen: Kitchen) -> None:
# metarecipes don't do anything.
kitchen.get_dependency_tracker().ignore()
class SystemdEnabled(Recipe):
"""
Sets the enabled state of the systemd unit.
"""
_NAME = "systemd-enabled"
def __init__(self, host: str, slug: str, args: dict, head: Head):
super().__init__(host, slug, args, head)
self.unit_name = slug if "." in slug else slug + ".service"
self.at = check_type(args.get("at"), str)
self.enabled = check_type_opt(args.get("enabled"), bool)
def prepare(self, preparation: Preparation, head: Head) -> None:
super().prepare(preparation, head)
preparation.needs("file", self.at)
preparation.needs("systemd-stage", "daemon-reloaded")
async def cook(self, kitchen: Kitchen) -> None:
kitchen.get_dependency_tracker()
result = await kitchen.ut1areq(
SimpleExec(
["systemctl", "enable" if self.enabled else "disable", self.unit_name],
"/",
),
SimpleExec.Result,
)
if result.exit_code != 0:
raise RuntimeError(
f"Failed to en/disable {self.unit_name}: {result.stderr.decode()}"
)

View File

View File

@ -0,0 +1,57 @@
from pathlib import PurePath
from typing import List, Optional, Union
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head import Recipe
from scone.head.exceptions import CookingError
from scone.head.kitchen import Kitchen, current_recipe
class ExecutionFailure(CookingError):
"""
A command failed.
"""
def __init__(
self,
args: List[str],
working_dir: str,
sous: str,
user: str,
result: SimpleExec.Result,
):
stderr = result.stderr.decode().replace("\n", "\n ")
message = (
f"Command failed on {sous} (user {user}) in {working_dir}.\n"
f"The command was: {args}\n"
f"Stderr was:\n {stderr}"
)
super().__init__(message)
async def exec_no_fails(
kitchen: Kitchen, args: List[str], working_dir: Union[str, PurePath]
) -> SimpleExec.Result:
if not isinstance(working_dir, str):
working_dir = str(working_dir)
result = await kitchen.start_and_consume_attrs(
SimpleExec(args, working_dir), SimpleExec.Result
)
if result.exit_code != 0:
recipe: Optional[Recipe] = current_recipe.get(None) # type: ignore
if recipe:
raise ExecutionFailure(
args,
working_dir,
recipe.get_host(),
recipe.get_user(kitchen.head),
result,
)
else:
raise ExecutionFailure(args, working_dir, "???", "???", result)
return result

View File

@ -0,0 +1,7 @@
from scone.default.utensils.basic_utensils import HashFile
from scone.head.kitchen import Kitchen
async def depend_remote_file(path: str, kitchen: Kitchen) -> None:
sha256 = await kitchen.ut1(HashFile(path))
kitchen.get_dependency_tracker().register_remote_file(path, sha256)

View File

@ -0,0 +1,74 @@
from enum import Enum
from pathlib import Path, PurePath
from typing import List, Optional, Tuple, Union
from jinja2 import Template
from scone.head import Head
from scone.head.kitchen import Kitchen
SUPERMARKET_RELATIVE = ".scone-cache/supermarket"
def get_fridge_dirs(head: Head) -> List[Path]:
# TODO expand with per-sous/per-group dirs?
return [Path(head.directory, "fridge")]
def search_in_dirlist(
dirlist: List[Path], relative: Union[str, PurePath]
) -> Optional[Path]:
for directory in dirlist:
potential_path = directory.joinpath(relative)
if potential_path.exists():
return potential_path
return None
def search_in_fridge(head: Head, relative: Union[str, PurePath]) -> Optional[Path]:
fridge_dirs = get_fridge_dirs(head)
return search_in_dirlist(fridge_dirs, relative)
class FridgeMetadata(Enum):
FRIDGE = 0
FROZEN = 1
TEMPLATE = 2
def decode_fridge_extension(path: str) -> Tuple[str, FridgeMetadata]:
exts = {
".frozen": FridgeMetadata.FROZEN,
".j2": FridgeMetadata.TEMPLATE,
# don't know if we want to support .j2.frozen, but we could in the future
}
for ext, meta in exts.items():
if path.endswith(ext):
return path[: -len(ext)], meta
return path, FridgeMetadata.FRIDGE
async def load_and_transform(
kitchen: Kitchen, meta: FridgeMetadata, fullpath: Path, sous: str
) -> bytes:
head = kitchen.head
# TODO(perf) don't do this in async loop
with fullpath.open("rb") as file:
data = file.read()
if meta == FridgeMetadata.FROZEN:
# decrypt
if head.secret_access is None:
raise RuntimeError("Frozen file but no secret access enabled!")
data = head.secret_access.decrypt_bytes(data)
elif meta == FridgeMetadata.TEMPLATE:
# pass through Jinja2
template = Template(data.decode())
proxies = kitchen.get_dependency_tracker().get_j2_compatible_dep_var_proxies(
head.variables[sous]
)
data = template.render(proxies).encode()
print("data", fullpath, data)
return data

View File

@ -0,0 +1,41 @@
from typing import Optional
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head.kitchen import Kitchen
async def create_linux_user(
kitchen: Kitchen,
name: str,
password_hash: Optional[str],
create_home: bool = True,
create_group: bool = True,
home: Optional[str] = None,
) -> None:
args = ["useradd"]
if password_hash:
# N.B. if you don't use a password hash, the account will be locked
# but passwordless SSH still works
args += ["-p", password_hash]
if create_home:
args.append("-m")
if create_group:
args.append("-U")
else:
args.append("-N")
if home:
args += ["-d", home]
# finally, append the user name
args.append(name)
result = await kitchen.ut1areq(SimpleExec(args, "/"), SimpleExec.Result)
if result.exit_code != 0:
raise RuntimeError(
"Failed to create user. Error was: " + result.stderr.strip().decode()
)

View File

View File

@ -0,0 +1,149 @@
import asyncio
import grp
import logging
import os
import pwd
import shutil
import stat
from typing import List
import attr
from scone.common.chanpro import Channel
from scone.common.misc import sha256_file
from scone.sous.utensils import Utensil, Worktop
@attr.s(auto_attribs=True)
class WriteFile(Utensil):
path: str
mode: int
async def execute(self, channel: Channel, worktop):
oldumask = os.umask(0)
fdnum = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode)
os.umask(oldumask)
with open(fdnum, "wb") as file:
while True:
next_chunk = await channel.recv()
if next_chunk is None:
break
assert isinstance(next_chunk, bytes)
file.write(next_chunk)
await channel.send("OK")
@attr.s(auto_attribs=True)
class MakeDirectory(Utensil):
path: str
mode: int
async def execute(self, channel: Channel, worktop):
oldumask = os.umask(0)
os.mkdir(self.path, self.mode)
os.umask(oldumask)
@attr.s(auto_attribs=True)
class Stat(Utensil):
path: str
logger = logging.getLogger(__name__)
@attr.s(auto_attribs=True)
class Result:
uid: int
gid: int
dir: bool
user: str
group: str
mode: int
async def execute(self, channel: Channel, worktop):
try:
self.logger.debug("going to stat")
stat_result = os.stat(self.path, follow_symlinks=False)
except FileNotFoundError:
self.logger.debug(":(")
await channel.send(None)
return
self.logger.debug("going to user")
user = pwd.getpwuid(stat_result.st_uid).pw_name
self.logger.debug("going to grp")
group = grp.getgrgid(stat_result.st_gid).gr_name
self.logger.debug("going to respond")
await channel.send(
Stat.Result(
uid=stat_result.st_uid,
gid=stat_result.st_gid,
dir=stat.S_ISDIR(stat_result.st_mode),
user=user,
group=group,
mode=stat_result.st_mode,
)
)
@attr.s(auto_attribs=True)
class Chown(Utensil):
path: str
user: str
group: str
async def execute(self, channel: Channel, worktop):
shutil.chown(self.path, self.user, self.group)
@attr.s(auto_attribs=True)
class Chmod(Utensil):
path: str
mode: int
async def execute(self, channel: Channel, worktop):
os.chmod(self.path, self.mode)
@attr.s(auto_attribs=True)
class SimpleExec(Utensil):
args: List[str]
working_dir: str
@attr.s(auto_attribs=True)
class Result:
exit_code: int
stdout: bytes
stderr: bytes
async def execute(self, channel: Channel, worktop: Worktop):
proc = await asyncio.create_subprocess_exec(
*self.args,
stdin=None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.working_dir
)
stdout, stderr = await proc.communicate()
# send the result
exit_code = proc.returncode
assert exit_code is not None
await channel.send(
SimpleExec.Result(exit_code=exit_code, stdout=stdout, stderr=stderr)
)
@attr.s(auto_attribs=True)
class HashFile(Utensil):
path: str
async def execute(self, channel: Channel, worktop: Worktop):
sha256 = await asyncio.get_running_loop().run_in_executor(
worktop.pools.threaded, sha256_file, self.path
)
await channel.send(sha256)

View File

@ -0,0 +1,25 @@
import attr
from scone.common.chanpro import Channel
from scone.sous import Utensil
from scone.sous.utensils import Worktop
@attr.s(auto_attribs=True)
class PostgresTransaction(Utensil):
database: str
async def execute(self, channel: Channel, worktop: Worktop) -> None:
import asyncpg
async with asyncpg.connect(database=self.database) as conn:
async with conn.transaction():
while True:
query, *args = await channel.recv()
if query is None:
break
results = [
dict(record) for record in await conn.fetch(query, *args)
]
await channel.send(results)

View File

@ -0,0 +1,74 @@
import asyncio
import sqlite3
from pathlib import Path
from typing import Dict, List
import attr
from scone.common.chanpro import Channel
from scone.common.misc import sha256_file
from scone.sous import Utensil
from scone.sous.utensils import Worktop
@attr.s(auto_attribs=True)
class CanSkipDynamic(Utensil):
sous_file_hashes: Dict[str, str]
async def execute(self, channel: Channel, worktop: Worktop):
for file, tracked_hash in self.sous_file_hashes.items():
try:
real_hash = await asyncio.get_running_loop().run_in_executor(
worktop.pools.threaded, sha256_file, file
)
if real_hash != tracked_hash:
await channel.send(False)
return
except IOError:
await channel.send(False)
# TODO should we log this? NB includes FileNotFound...
return
await channel.send(True)
@attr.s(auto_attribs=True)
class HasChangedInSousStore(Utensil):
purpose: str
paths: List[str]
def _sync_execute(self, worktop: Worktop) -> bool:
with sqlite3.connect(Path(worktop.dir, "sous_store.db")) as db:
db.execute(
"""
CREATE TABLE IF NOT EXISTS hash_store
(purpose TEXT, path TEXT, hash TEXT, PRIMARY KEY (purpose, path))
"""
)
changed = False
for file in self.paths:
real_hash = sha256_file(file)
c = db.execute(
"SELECT hash FROM hash_store WHERE purpose=? AND path=?",
(self.purpose, file),
)
db_hash = c.fetchone()
if db_hash is None:
changed = True
db.execute(
"INSERT INTO hash_store VALUES (?, ?, ?)",
(self.purpose, file, real_hash),
)
elif db_hash[0] != real_hash:
changed = True
db.execute(
"UPDATE hash_store SET hash=? WHERE purpose=? AND path=?",
(real_hash, self.purpose, file),
)
return changed
async def execute(self, channel: Channel, worktop: Worktop):
answer = await asyncio.get_running_loop().run_in_executor(
worktop.pools.threaded, self._sync_execute, worktop
)
await channel.send(answer)

View File

@ -0,0 +1,35 @@
import pwd
import attr
from scone.common.chanpro import Channel
from scone.sous import Utensil
from scone.sous.utensils import Worktop
@attr.s(auto_attribs=True)
class GetPasswdEntry(Utensil):
user_name: str
@attr.s(auto_attribs=True)
class Result:
uid: int
gid: int
home: str
shell: str
async def execute(self, channel: Channel, worktop: Worktop):
try:
entry = pwd.getpwnam(self.user_name)
except KeyError:
await channel.send(None)
return
await channel.send(
GetPasswdEntry.Result(
uid=entry.pw_uid,
gid=entry.pw_gid,
home=entry.pw_dir,
shell=entry.pw_shell,
)
)

201
scone/head/__init__.py Normal file
View File

@ -0,0 +1,201 @@
import copy
import itertools
import logging
import re
import sys
from os import path
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, cast
import toml
from nacl.encoding import URLSafeBase64Encoder
from scone.common.loader import ClassLoader
from scone.common.misc import eprint
from scone.common.pools import Pools
from scone.head import menu_reader
from scone.head.menu_reader import HostMenu, Menu
from scone.head.recipe import Recipe, recipe_name_getter
from scone.head.secrets import SecretAccess
from scone.head.variables import Variables, merge_right_into_left_inplace
logger = logging.getLogger(__name__)
class Head:
def __init__(
self,
directory: str,
recipe_loader: ClassLoader[Recipe],
menu: Menu,
sous: Dict[str, dict],
groups: Dict[str, List[str]],
secret_access: Optional[SecretAccess],
pools: Pools,
):
self.directory = directory
self.recipe_loader = recipe_loader
self.menu = menu
self.souss = sous
self.groups = groups
self.secret_access = secret_access
self.variables: Dict[str, Variables] = dict()
self.pools = pools
@staticmethod
def open(directory: str):
with open(path.join(directory, "scone.head.toml")) as head_toml:
head_data = toml.load(head_toml)
secret_access: Optional[SecretAccess] = None
if "freezer" in head_data and "restaurant_id" in head_data["freezer"]:
secret_access = SecretAccess(head_data["freezer"]["restaurant_id"])
secret_access.get_existing()
if not secret_access.key:
eprint("Failed to load freezer secret.")
sys.exit(12)
recipe_module_roots = head_data.get("recipe_roots", ["scone.default.recipes"])
# load available recipes
recipe_loader: ClassLoader[Recipe] = ClassLoader(Recipe, recipe_name_getter)
for recipe_root in recipe_module_roots:
recipe_loader.add_package_root(recipe_root)
sous = head_data.get("sous", dict())
groups = head_data.get("group", dict())
groups["all"] = list(sous.keys())
# load the menu
menu = menu_reader.parse_toml_menu_descriptors(path.join(directory, "menu"))
pools = Pools()
head = Head(directory, recipe_loader, menu, sous, groups, secret_access, pools)
head._load_variables()
return head
def _preload_variables(self, who_for: str) -> Tuple[dict, dict]:
out_frozen: Dict[str, Any] = {}
out_chilled: Dict[str, Any] = {}
vardir = Path(self.directory, "vars", who_for)
logger.debug("preloading vars for %s in %s", who_for, str(vardir))
for file in vardir.glob("*.vf.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for frozen vars", file)
frozen_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_frozen, frozen_vars)
for file in vardir.glob("*.v.toml"):
if not file.is_file():
continue
with file.open() as var_file:
logger.debug("Opened %s for vars", file)
chilled_vars = cast(Dict[Any, Any], toml.load(var_file))
merge_right_into_left_inplace(out_chilled, chilled_vars)
to_transform = [out_frozen]
while to_transform:
next_dict = to_transform.pop()
for k, v in next_dict.items():
if isinstance(v, str):
b64_secret = re.sub(r"\s", "", v)
if not self.secret_access:
raise RuntimeError("Secret access disabled; cannot thaw.")
next_dict[k] = self.secret_access.decrypt_bytes(
b64_secret.encode(), encoder=URLSafeBase64Encoder
).decode()
elif isinstance(v, dict):
to_transform.append(v)
else:
raise ValueError(f"Not permitted in frozen variables file: '{v}'.")
return out_chilled, out_frozen
def _load_variables(self):
preload: Dict[str, Tuple[dict, dict]] = dict()
for who_name in itertools.chain(self.souss, self.groups):
preload[who_name] = self._preload_variables(who_name)
for sous_name in self.souss:
order = ["all"]
order += [
group
for group, members in self.groups.items()
if sous_name in members and group != "all"
]
order.append(sous_name)
chilled: Dict[str, Any] = {}
frozen: Dict[str, Any] = {}
for who_name in order:
in_chilled, in_frozen = preload[who_name]
merge_right_into_left_inplace(chilled, in_chilled)
merge_right_into_left_inplace(frozen, in_frozen)
sous_vars = Variables()
sous_vars.load_plain(frozen)
sous_vars.load_vars_with_substitutions(chilled)
self.variables[sous_name] = sous_vars
def _construct_hostmenu_for(
self, hostmenu: HostMenu, host: str, recipe_list: List[Recipe], head: "Head"
) -> None:
for recipe_id, dishes in hostmenu.dishes.items():
recipe_cls = self.recipe_loader.get_class(recipe_id)
if not recipe_cls:
raise RuntimeError(f"Unable to find recipe class for '{recipe_id}'.")
for slug, args in dishes.items():
args = copy.deepcopy(args)
self.variables[host].substitute_inplace_in_dict(args)
recipe = recipe_cls.from_menu(host, slug, args, head)
recipe_list.append(recipe)
def construct_recipes(self):
recipes = {}
for sous in self.souss:
logger.debug("Constructing recipes for %s", sous)
sous_recipe_list: List[Recipe] = []
# construct recipes for it only
sous_hm = self.menu.hostmenus.get(sous)
if sous_hm is not None:
self._construct_hostmenu_for(sous_hm, sous, sous_recipe_list, self)
# construct recipes for it that are for groups it is in
for group, members in self.groups.items():
if sous in members:
group_hm = self.menu.hostmenus.get(group)
if group_hm is not None:
self._construct_hostmenu_for(
group_hm, sous, sous_recipe_list, self
)
recipes[sous] = sous_recipe_list
logger.info("Constructed %d recipes for %s.", len(sous_recipe_list), sous)
return recipes
def debug_info(self) -> str:
lines = []
lines.append("Head Configuration")
lines.append(" Sous List")
for name, sous in self.souss.items():
lines.append(f" - {name} = {sous}")
lines.append("")
lines.append(" Sous Groups")
for name, group in self.groups.items():
lines.append(f" - {name} = {group}")
lines.append("")
lines += [" " + line for line in str(self.recipe_loader).splitlines()]
lines.append("")
lines += [" " + line for line in str(self.menu).splitlines()]
lines.append("")
return "\n".join(lines)

132
scone/head/cli/__init__.py Normal file
View File

@ -0,0 +1,132 @@
import asyncio
import logging
import os
import sys
import time
from argparse import ArgumentParser
from pathlib import Path
from scone.common.misc import eprint
from scone.common.pools import Pools
from scone.head import Head
from scone.head.dependency_tracking import DependencyCache, run_dep_checks
from scone.head.kitchen import Kitchen
from scone.head.recipe import Preparation, Recipe
def cli() -> None:
logging.basicConfig()
logging.getLogger("scone").setLevel(logging.DEBUG)
code = asyncio.get_event_loop().run_until_complete(cli_async())
sys.exit(code)
async def cli_async() -> int:
dep_cache = None
try:
args = sys.argv[1:]
parser = ArgumentParser(description="Cook!")
parser.add_argument("hostspec", type=str, help="Sous or group name")
parser.add_argument(
"--yes",
"-y",
action="store_true",
default=False,
help="Don't prompt for confirmation",
)
argp = parser.parse_args(args)
eprint("Loading head…")
cdir = Path(os.getcwd())
while not Path(cdir, "scone.head.toml").exists():
cdir = cdir.parent
if len(cdir.parts) <= 1:
eprint("Don't appear to be in a head. STOP.")
return 1
head = Head.open(str(cdir))
eprint(head.debug_info())
hosts = set()
if argp.hostspec in head.souss:
hosts.add(argp.hostspec)
elif argp.hostspec in head.groups:
for sous in head.groups[argp.hostspec]:
hosts.add(sous)
else:
eprint(f"Unrecognised sous or group: '{argp.hostspec}'")
sys.exit(1)
eprint(f"Selected the following souss: {', '.join(hosts)}")
recipes_by_sous = head.construct_recipes()
recipes_to_do = []
for sous in hosts:
recipes_to_do += recipes_by_sous.get(sous, [])
eprint(f"Preparing {len(recipes_to_do)} recipes…")
prepare = Preparation(recipes_to_do)
start_ts = time.monotonic()
order = prepare.prepare(head)
notifying_provides = prepare.notifying_provides
del prepare
end_ts = time.monotonic()
eprint(f"Preparation completed in {end_ts - start_ts:.3f} s.")
eprint(f"{len(order)} courses planned.")
dep_cache = await DependencyCache.open(
os.path.join(head.directory, "depcache.sqlite3")
)
eprint("Checking dependency cache…")
start_ts = time.monotonic()
depchecks = await run_dep_checks(head, dep_cache, order)
end_ts = time.monotonic()
eprint(f"Checking finished in {end_ts - start_ts:.3f} s.") # TODO show counts
for epoch, items in enumerate(order):
print(f"----- Course {epoch} -----")
for item in items:
if isinstance(item, Recipe):
state = depchecks[item].label.name
print(f" > recipe ({state}) {item}")
elif isinstance(item, tuple):
kind, ident, extra = item
print(f" - we now have {kind} {ident} {dict(extra)}")
eprint("Ready to cook? [y/N]: ", end="")
if argp.yes:
eprint("y (due to --yes)")
else:
if not input().lower().startswith("y"):
eprint("Stopping.")
return 101
kitchen = Kitchen(head, dep_cache, notifying_provides)
for epoch, epoch_items in enumerate(order):
print(f"Cooking Course {epoch} of {len(order)}")
await kitchen.run_epoch(
epoch_items, depchecks, concurrency_limit_per_host=8
)
for sous in hosts:
await dep_cache.sweep_old(sous)
return 0
finally:
Pools.get().shutdown()
if dep_cache:
await dep_cache.db.close()
if __name__ == "__main__":
cli()

149
scone/head/cli/freezer.py Normal file
View File

@ -0,0 +1,149 @@
import os
import re
import sys
from os.path import join
from pathlib import Path
import toml
from nacl.encoding import URLSafeBase64Encoder
from scone.common.misc import eprint
from scone.head.secrets import SecretAccess
def cli() -> None:
args = sys.argv[1:]
if len(args) < 1:
eprint("Not enough arguments.")
eprint("Usage: scone-freezer <subcommand>:")
eprint(" freezefile <file> [small files only for now!]")
eprint(" thawfile <file>")
eprint(" freezevar <key> (value as 1 line in stdin)")
eprint(" thawvar <key>")
eprint(" genkey")
eprint(" test")
sys.exit(127)
cdir = Path(os.getcwd())
while not Path(cdir, "scone.head.toml").exists():
cdir = cdir.parent
if len(cdir.parts) <= 1:
eprint("Don't appear to be in a head. STOP.")
sys.exit(1)
with open(join(cdir, "scone.head.toml")) as head_toml:
head_data = toml.load(head_toml)
if "freezer" in head_data and "restaurant_id" in head_data["freezer"]:
restaurant_id = head_data["freezer"]["restaurant_id"]
else:
eprint("Tip: Set a freezer.restaurant_id in your scone.head.toml")
eprint(" to enable the ability to store your secret in the secret service.")
restaurant_id = None
secret_access = SecretAccess(restaurant_id)
if args[0] == "genkey":
assert len(args) == 1
secret_access.generate_new()
elif args[0] == "test":
secret_access.get_existing()
if secret_access.key:
eprint("Great! Key found.")
else:
eprint("Oh no! Key not found.")
elif args[0] == "freezefile":
secret_access.get_existing()
if not secret_access.key:
eprint("No key found!")
sys.exit(12)
assert len(args) >= 2
filepaths = [Path(p) for p in args[1:]]
ec = 0
for path in filepaths:
if not path.exists():
eprint(f"Can't freeze: no such file '{path}'")
sys.exit(10)
for path in filepaths:
eprint(f"Freezing {path}")
if not path.is_file():
eprint(f"Can't freeze (skipping): not a regular file '{path}'")
ec = 5
# slurping here for simplicity;
file_bytes = path.read_bytes()
enc_bytes = secret_access.encrypt_bytes(file_bytes)
dest_path = Path(str(path) + ".frozen")
dest_path.write_bytes(enc_bytes)
sys.exit(ec)
elif args[0] == "thawfile":
secret_access.get_existing()
if not secret_access.key:
eprint("No key found!")
sys.exit(12)
assert len(args) >= 2
filepaths = [Path(p) for p in args[1:]]
ec = 0
for path in filepaths:
if not path.exists():
eprint(f"Can't thaw: no such file '{path}'")
sys.exit(10)
for path in filepaths:
eprint(f"Thawing {path}")
if not path.is_file():
eprint(f"Can't thaw (skipping): not a regular file '{path}'")
ec = 5
continue
pathstr = str(path)
if not pathstr.endswith(".frozen"):
eprint(f"Can't thaw (skipping): not .frozen '{path}'")
continue
# slurping here for simplicity;
file_bytes = path.read_bytes()
dec_bytes = secret_access.decrypt_bytes(file_bytes)
dest_path = Path(str(pathstr[: -len(".frozen")]))
dest_path.write_bytes(dec_bytes)
sys.exit(ec)
elif args[0] == "freezevar":
assert len(args) == 2
secret_access.get_existing()
if not secret_access.key:
eprint("No key found!")
sys.exit(12)
key = args[1]
eprint("Enter value to freeze: ", end="", flush=True)
value = input()
enc_b64 = secret_access.encrypt_bytes(
value.encode(), encoder=URLSafeBase64Encoder
).decode()
n = 78
str_contents = "\n".join(
[" " + enc_b64[i : i + n] for i in range(0, len(enc_b64), n)]
)
print(f'{key} = """')
print(str_contents)
print('"""')
elif args[0] == "thawvar":
assert len(args) == 1
secret_access.get_existing()
if not secret_access.key:
eprint("No key found!")
sys.exit(12)
eprint("Enter base64 to thaw (whitespace removed painlessly) then EOF (^D):")
value = re.sub(r"\s", "", sys.stdin.read())
dec_str = secret_access.decrypt_bytes(
value.encode(), encoder=URLSafeBase64Encoder
).decode()
print(dec_str)

View File

@ -0,0 +1,26 @@
import asyncio
import sys
from argparse import ArgumentParser
def cli() -> None:
code = asyncio.get_event_loop().run_until_complete(cli_async())
sys.exit(code)
async def cli_async() -> int:
args = sys.argv[1:]
parser = ArgumentParser(description="Compose a menu!")
subs = parser.add_subparsers()
supermarket = subs.add_parser("supermarket", help="generate a [[supermarket]] dish")
supermarket.add_argument("url", help="HTTPS URL to download")
supermarket.add_argument("-a", "--as", help="Alternative filename")
supermarket.set_defaults(func=supermarket_cli)
argp = parser.parse_args(args)
return await argp.func(argp)
async def supermarket_cli(argp) -> int:
return 0 # TODO

View File

@ -0,0 +1,366 @@
import asyncio
import json
import logging
import time
from asyncio import Queue
from enum import Enum
from hashlib import sha256
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Union
import aiosqlite
import attr
import canonicaljson
import cattr
from aiosqlite import Connection
from scone.common.misc import sha256_file
from scone.common.pools import Pools
from scone.head import Head, Recipe, Variables, recipe_name_getter
from scone.head.recipe import DepEle
canonicaljson.set_json_library(json)
logger = logging.getLogger(__name__)
# TODO(security, low): how to prevent passwords being recovered from the
# paramhashes in a dependency store?
# TODO(correctness, perf): recipes with @src@0 slugs should not be registered
# to a slug.
def _canonicalise_dict(input: Dict[str, Any]) -> Dict[str, Any]:
output: Dict[str, Any] = {}
for key, value in input.items():
if isinstance(value, dict):
output[key] = _canonicalise_dict(value)
elif isinstance(value, set):
new_list = list(value)
new_list.sort()
output[key] = new_list
else:
output[key] = value
return output
def hash_dict(value: dict) -> str:
return sha256(
canonicaljson.encode_canonical_json(_canonicalise_dict(value))
).hexdigest()
def paramhash_recipe(recipe: Recipe) -> str:
args = getattr(recipe, "_args").copy()
del args[".source"]
return hash_dict(args)
@attr.s(auto_attribs=True)
class DependencyBook:
var_names: List[str]
var_hash: str
fridge_hashes: Dict[str, str]
recipe_revisions: Dict[str, int]
dyn_sous_file_hashes: Dict[str, str]
async def can_skip_static(self, head: Head, recipe: Recipe) -> bool:
from scone.default.steps.fridge_steps import search_in_fridge
# start with variables
sous_vars = head.variables[recipe.get_host()]
var_comp = dict()
for var_name in self.var_names:
try:
var_comp[var_name] = sous_vars.get_dotted(var_name)
except KeyError:
return False
if hash_dict(var_comp) != self.var_hash:
return False
# now we have to check files in the fridge
for fridge_name, expected_hash in self.fridge_hashes.items():
real_pathstr = search_in_fridge(head, fridge_name)
if not real_pathstr:
# vanished locally; that counts as a change
return False
real_hash = await asyncio.get_running_loop().run_in_executor(
head.pools.threaded, sha256_file, real_pathstr
)
if real_hash != expected_hash:
return False
return True
def has_dynamic(self) -> bool:
return len(self.dyn_sous_file_hashes) > 0
class DependencyTracker:
"""
Tracks the dependencies of a task and then inserts a row as needed.
"""
def __init__(self, pools: Pools):
self._vars: Dict[str, Any] = {}
self._fridge: Dict[str, str] = {}
self._recipe_revisions: Dict[str, int] = {}
self._dyn_sous_files: Dict[str, str] = {}
self._ignored = False
self._pools = pools
def ignore(self):
"""
Call when dependency tracking is not desired (or not advanced enough to
be useful.)
"""
self._ignored = True
async def register_fridge_file(self, fridge_path: str, real_path: str):
if fridge_path not in self._fridge:
f_hash = await asyncio.get_running_loop().run_in_executor(
self._pools.threaded, sha256_file, real_path
)
self._fridge[fridge_path] = f_hash
def register_recipe(self, recipe: Recipe):
cls = recipe.__class__
rec_name = recipe_name_getter(cls)
if not rec_name:
return
self._recipe_revisions[rec_name] = getattr(cls, "_REVISION", None)
def register_variable(self, variable: str, value: Union[dict, str, int]):
self._vars[variable] = value
def register_remote_file(self, file: str, file_hash: str):
self._dyn_sous_files[file] = file_hash
def make_depbook(self) -> Optional[DependencyBook]:
if self._ignored:
return None
dep_book = DependencyBook(
list(self._vars.keys()),
hash_dict(self._vars),
self._fridge.copy(),
self._recipe_revisions,
self._dyn_sous_files,
)
return dep_book
def get_j2_compatible_dep_var_proxies(
self, variables: Variables
) -> Dict[str, "DependencyVarProxy"]:
result = {}
for key, vars in variables.toplevel().items():
result[key] = DependencyVarProxy(self, vars, key + ".")
return result
class DependencyVarProxy:
"""
Provides convenient access to variables that also properly tracks
dependencies.
"""
def __init__(
self, dependency_tracker: DependencyTracker, variables: dict, prefix: str = ""
):
self._dvp_dt: DependencyTracker = dependency_tracker
self._dvp_prefix = prefix
self._dvp_vars = variables
def __getattr__(self, key: str):
fully_qualified_varname = self._dvp_prefix + key
value = self._dvp_vars.get(key, ...)
if value is ...:
raise KeyError(f"Variable does not exist: {fully_qualified_varname}")
elif isinstance(value, dict):
return DependencyVarProxy(self._dvp_dt, value, key + ".")
else:
self._dvp_dt.register_variable(fully_qualified_varname, value)
return value
class DependencyCache:
def __init__(self):
self.db: Connection = None # type: ignore
self.time = int(time.time() * 1000)
@classmethod
async def open(cls, path: str) -> "DependencyCache":
dc = DependencyCache()
dc.db = await aiosqlite.connect(path)
await dc.db.executescript(
"""
CREATE TABLE IF NOT EXISTS dishcache (
source_file TEXT,
host TEXT,
recipe_id TEXT,
slug TEXT,
paramhash TEXT,
dep_book TEXT,
ts INT,
PRIMARY KEY (source_file, host, recipe_id, slug, paramhash)
);
CREATE INDEX IF NOT EXISTS dishcache_ts ON dishcache (ts);
"""
)
await dc.db.commit()
return dc
async def sweep_old(self, host: str):
# TODO(scope creep) allow sweeping only certain source files
# so we can do partial execution.
await self.db.execute(
"""
DELETE FROM dishcache
WHERE host = ?
AND ts < ?
""",
(host, self.time),
)
await self.db.commit()
async def inquire(self, recipe: Recipe) -> Optional[Tuple[int, DependencyBook]]:
paramhash = paramhash_recipe(recipe)
rows = await self.db.execute_fetchall(
"""
SELECT rowid, dep_book FROM dishcache
WHERE source_file = ?
AND host = ?
AND recipe_id = ?
AND paramhash = ?
AND slug = ?
LIMIT 1
""",
(
recipe._args[".source"][0],
recipe.get_host(),
recipe_name_getter(recipe.__class__),
paramhash,
recipe._slug,
),
)
rows = list(rows)
if not rows:
return None
(rowid, dep_book_json) = rows[0]
try:
dep_book = cattr.structure(json.loads(dep_book_json), DependencyBook)
except Exception:
logger.error(
"Failed to structure DependencyBook: %s", dep_book_json, exc_info=True
)
raise
return rowid, dep_book
async def register(self, recipe: Recipe, dep_book: DependencyBook):
paramhash = paramhash_recipe(recipe)
await self.db.execute(
"""
INSERT INTO dishcache
(source_file, host, recipe_id, slug, paramhash, dep_book, ts)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (source_file, host, recipe_id, paramhash, slug)
DO UPDATE SET
dep_book = excluded.dep_book,
ts = excluded.ts
""",
(
recipe._args[".source"][0],
recipe.get_host(),
recipe_name_getter(recipe.__class__),
recipe._slug,
paramhash,
canonicaljson.encode_canonical_json(cattr.unstructure(dep_book)),
self.time,
),
)
await self.db.commit()
async def renew(self, rowid: int):
# TODO(perf): batch up many renews
await self.db.execute(
"""
UPDATE dishcache SET ts = ? WHERE rowid = ? LIMIT 1;
""",
(self.time, rowid),
)
await self.db.commit()
class CheckOutcomeLabel(Enum):
# Not in dependency cache, so must run.
NOT_CACHED = 0
# Dependency cache suggests we must rerun
MUST_REDO = 1
# Dependency cache suggests we are fine if dynamic dependencies haven't
# changed
CHECK_DYNAMIC = 2
# Dependency cache says we can skip; there are no dynamic dependencies
SAFE_TO_SKIP = 3
DepCheckOutcome = NamedTuple(
"DepCheckOutcome",
(("label", CheckOutcomeLabel), ("book", Optional[DependencyBook])),
)
async def run_dep_checks(
head: Head, dep_cache: DependencyCache, order: List[List[DepEle]]
) -> Dict[Recipe, DepCheckOutcome]:
queue: Queue[Optional[Recipe]] = Queue(32)
outcomes = {}
async def consumer():
while True:
recipe = await queue.get()
if not recipe:
break
t = await dep_cache.inquire(recipe)
if t:
# we need to check if dependencies have changed…
rowid, dep_book = t
if await dep_book.can_skip_static(head, recipe):
# we will renew either way
await dep_cache.renew(rowid)
if dep_book.has_dynamic():
# has dynamic dependencies
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.CHECK_DYNAMIC, dep_book
)
else:
# can skip!
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.SAFE_TO_SKIP, None
)
else:
outcomes[recipe] = DepCheckOutcome(
CheckOutcomeLabel.MUST_REDO, None
)
else:
outcomes[recipe] = DepCheckOutcome(CheckOutcomeLabel.NOT_CACHED, None)
queue.task_done()
async def producer():
for course in order:
for recipe in course:
if isinstance(recipe, Recipe):
await queue.put(recipe)
await queue.join()
for worker in consumers:
await queue.put(None)
consumers = [asyncio.create_task(consumer()) for _ in range(8)]
await asyncio.gather(*consumers, producer(), return_exceptions=False)
return outcomes

6
scone/head/exceptions.py Normal file
View File

@ -0,0 +1,6 @@
class CookingError(Exception):
"""
Error in cooking.
"""
pass

192
scone/head/kitchen.py Normal file
View File

@ -0,0 +1,192 @@
import asyncio
import logging
from asyncio import Future
from collections import defaultdict
from contextvars import ContextVar
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, TypeVar
import attr
import cattr
from scone.common.chanpro import Channel, ChanProHead
from scone.default.utensils.dynamic_dependencies import CanSkipDynamic
from scone.head import Head, Recipe, sshconn
from scone.head.dependency_tracking import (
CheckOutcomeLabel,
DepCheckOutcome,
DependencyCache,
DependencyTracker,
)
from scone.head.recipe import DepEle, DependencySpec
from scone.sous import utensil_namer
from scone.sous.utensils import Utensil
logger = logging.getLogger(__name__)
current_recipe: ContextVar[Recipe] = ContextVar("current_recipe")
A = TypeVar("A")
class Kitchen:
def __init__(
self,
head: Head,
dependency_store: DependencyCache,
notifying_provides: Dict[Recipe, List[DependencySpec]],
):
self._chanproheads: Dict[Tuple[str, str], Future[ChanProHead]] = dict()
self._dependency_store = dependency_store
self._dependency_trackers: Dict[Recipe, DependencyTracker] = defaultdict(
lambda: DependencyTracker(head.pools)
)
self.head = head
self._notifying_provides = notifying_provides
self.notifications: Dict[DependencySpec, bool] = dict()
def get_dependency_tracker(self):
return self._dependency_trackers[current_recipe.get()]
async def get_chanprohead(self, host: str, user: str) -> ChanProHead:
async def new_conn():
connection_details = self.head.souss[host]
# XXX opt ckey =
# os.path.join(self.head.directory, connection_details["clientkey"])
try:
cp, root = await sshconn.open_ssh_sous(
connection_details["host"],
connection_details["user"],
None,
user,
connection_details["souscmd"],
connection_details.get("dangerous_debug_logging", False)
)
except Exception:
logger.error("Failed to open SSH connection", exc_info=True)
raise
return ChanProHead(cp, root)
hostuser = (host, user)
if hostuser not in self._chanproheads:
self._chanproheads[hostuser] = asyncio.create_task(new_conn())
return await self._chanproheads[hostuser]
async def run_epoch(
self,
epoch: List[DepEle],
depchecks: Dict[Recipe, DepCheckOutcome],
concurrency_limit_per_host: int = 5,
):
per_host_lists: Dict[str, List[Recipe]] = defaultdict(lambda: [])
# sort into per-host lists
for recipe in epoch:
if isinstance(recipe, Recipe):
if depchecks[recipe].label != CheckOutcomeLabel.SAFE_TO_SKIP:
per_host_lists[recipe.get_host()].append(recipe)
coros: List[Coroutine] = []
for host, recipes in per_host_lists.items():
host_work_pool = HostWorkPool(recipes, depchecks)
coros.append(host_work_pool.cook_all(self, concurrency_limit_per_host))
await asyncio.gather(*coros, return_exceptions=False)
async def start(self, utensil: Utensil) -> Channel:
utensil_name = utensil_namer(utensil.__class__)
recipe = current_recipe.get()
cph = await self.get_chanprohead(recipe.get_host(), recipe.get_user(self.head))
# noinspection PyDataclass
payload = cattr.unstructure(utensil)
return await cph.start_command_channel(utensil_name, payload)
ut = start
async def start_and_consume(self, utensil: Utensil) -> Any:
channel = await self.start(utensil)
return await channel.consume()
ut1 = start_and_consume
async def start_and_consume_attrs_optional(
self, utensil: Utensil, attr_class: Type[A]
) -> Optional[A]:
value = await self.start_and_consume(utensil)
if value is None:
return None
return cattr.structure(value, attr_class)
ut1a = start_and_consume_attrs_optional
async def start_and_consume_attrs(self, utensil: Utensil, attr_class: Type[A]) -> A:
value = await self.start_and_consume_attrs_optional(utensil, attr_class)
if value is None:
raise ValueError("Received None")
return value
ut1areq = start_and_consume_attrs
async def start_and_wait_close(self, utensil: Utensil) -> Any:
channel = await self.start(utensil)
return await channel.wait_close()
ut0 = start_and_wait_close
async def _store_dependency(self, recipe: Recipe):
dependency_tracker = self._dependency_trackers.pop(recipe, None)
if not dependency_tracker:
raise KeyError(f"Recipe {recipe} has not been tracked.")
depbook = dependency_tracker.make_depbook()
if depbook:
await self._dependency_store.register(recipe, depbook)
@attr.s(auto_attribs=True)
class HostWorkPool:
jobs: List[Recipe]
depchecks: Dict[Recipe, DepCheckOutcome]
next_job: int = 0
async def cook_all(self, kitchen: Kitchen, concurrency_limit: int):
num_jobs = len(self.jobs)
concurrency_limit = min(num_jobs, concurrency_limit)
async def cooker():
while self.next_job < num_jobs:
recipe = self.jobs[self.next_job]
self.next_job += 1
current_recipe.set(recipe)
depcheck = self.depchecks.get(recipe)
if (
depcheck is not None
and depcheck.label == CheckOutcomeLabel.CHECK_DYNAMIC
):
book = depcheck.book
assert book is not None
can_skip = await kitchen.ut1(
CanSkipDynamic(book.dyn_sous_file_hashes)
)
if can_skip:
continue
await recipe.cook(kitchen)
# if successful, store dependencies
await kitchen._store_dependency(recipe)
nps = kitchen._notifying_provides.get(recipe, None)
if nps:
for depspec in nps:
if depspec not in kitchen.notifications:
# default to changed if not told otherwise
kitchen.notifications[depspec] = True
await asyncio.gather(
*[asyncio.create_task(cooker()) for _ in range(concurrency_limit)]
)

125
scone/head/menu_reader.py Normal file
View File

@ -0,0 +1,125 @@
import os
from os import path
from pathlib import Path
from typing import Any, Dict
import toml
class Menu:
def __init__(self):
self.hostmenus = {}
def get_host(self, name: str):
if name in self.hostmenus:
return self.hostmenus[name]
else:
new = HostMenu()
self.hostmenus[name] = new
return new
def __str__(self):
lines = ["Menu"]
for hostspec, hostmenu in self.hostmenus.items():
lines.append(f" on {hostspec} :-")
lines += [" " + line for line in str(hostmenu).split("\n")]
lines.append("")
return "\n".join(lines)
class HostMenu:
def __init__(self):
self.dishes = {}
def __str__(self):
lines = ["Menu"]
for recipe, dishes in self.dishes.items():
lines.append(f"- recipe {recipe}")
lines += [f" - {slug} {args}" for slug, args in dishes.items()]
lines.append("")
return "\n".join(lines)
def parse_toml_menu_descriptor(
filename: str, menu: Menu, default_hostspec: str, source_name: str = None
) -> None:
source_name = source_name or filename
with open(filename, "r") as f:
menu_desc: Dict[str, Any] = toml.load(f) # type: ignore
if "-----" in menu_desc:
magic_tweaks = menu_desc["-----"]
del menu_desc["-----"]
else:
magic_tweaks = {}
for key, dishes in menu_desc.items():
# print(key, "=", dishes)
key_parts = key.split("--")
lkp = len(key_parts)
if lkp == 1:
# pg-db.synapse
hostspec = default_hostspec
recipe = key_parts[0]
elif lkp == 2:
if key_parts[1] == "":
# fridge-copy--
hostspec = default_hostspec
recipe = key_parts[0]
else:
# server1--pg-db.synapse
hostspec = key_parts[0]
recipe = key_parts[1]
elif lkp == 3 and key_parts[2] == "":
# server2--fridge-copy--
hostspec = key_parts[0]
recipe = key_parts[1]
else:
raise ValueError(f"Don't understand key: {key}")
hostmenu = menu.get_host(hostspec)
if recipe in hostmenu.dishes:
mdishes = hostmenu.dishes[recipe]
else:
mdishes = {}
hostmenu.dishes[recipe] = mdishes
if isinstance(dishes, dict):
for slug, args in dishes.items():
if slug in mdishes:
raise ValueError(
f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}"
)
mdishes[slug] = args
args[".source"] = (source_name, key, slug)
args[".m"] = magic_tweaks
elif isinstance(dishes, list):
for idx, args in enumerate(dishes):
slug = f"@{source_name}@{idx}"
if slug in mdishes:
raise ValueError(
f"Conflict in: Host {hostspec} Recipe {recipe} Dish Slug {slug}"
)
mdishes[slug] = args
args[".source"] = (source_name, key, idx)
args[".m"] = magic_tweaks
def parse_toml_menu_descriptors(menu_dir: str) -> Menu:
menu = Menu()
for root, dirs, files in os.walk(menu_dir):
for file in files:
full_path = path.join(root, file)
if file.endswith(".toml"):
# load this as a menu file
pieces = file.split(".")
default_hostspec = pieces[-2]
relative = str(Path(full_path).relative_to(menu_dir))
parse_toml_menu_descriptor(full_path, menu, default_hostspec, relative)
return menu

171
scone/head/recipe.py Normal file
View File

@ -0,0 +1,171 @@
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import toposort
if typing.TYPE_CHECKING:
from scone.head import Head
from scone.head.kitchen import Kitchen
DependencySpec = Tuple[str, str, frozenset]
DepEle = Union["Recipe", DependencySpec]
class Preparation:
"""
Preparation on a single host.
This is done before we start deduplicating (could this be improved?).
"""
def __init__(self, recipes: List["Recipe"]):
self._to_process = recipes.copy()
self.recipes = recipes
self._dependencies: Dict[DepEle, Set[DepEle]] = {}
self._recipe_now: Optional[Recipe] = None
self._hard_needs: Set[DependencySpec] = set()
self.notifying_provides: Dict[Recipe, List[DependencySpec]] = defaultdict(
lambda: []
)
def _make_depspec_tuple(
self, requirement: str, identifier: str, **extra_identifiers: Any
) -> DependencySpec:
if "host" not in extra_identifiers:
assert self._recipe_now is not None
extra_identifiers["host"] = self._recipe_now.get_host()
return requirement, identifier, frozenset(extra_identifiers.items())
def needs(
self,
requirement: str,
identifier: str,
hard: bool = False,
**extra_identifiers: Any,
) -> None:
assert self._recipe_now is not None
if self._recipe_now not in self._dependencies:
self._dependencies[self._recipe_now] = set()
depspec_tuple = self._make_depspec_tuple(
requirement, identifier, **extra_identifiers
)
self._dependencies[self._recipe_now].add(depspec_tuple)
if hard:
self._hard_needs.add(depspec_tuple)
def provides(
self,
requirement: str,
identifier: str,
notifying: bool = False,
**extra_identifiers: Any,
) -> None:
assert self._recipe_now is not None
depspec_tuple = self._make_depspec_tuple(
requirement, identifier, **extra_identifiers
)
if depspec_tuple not in self._dependencies:
self._dependencies[depspec_tuple] = set()
self._dependencies[depspec_tuple].add(self._recipe_now)
if notifying:
self.notifying_provides[self._recipe_now].append(depspec_tuple)
def subrecipe(self, recipe: "Recipe"):
assert self._recipe_now is not None
self._to_process.append(recipe)
self.recipes.append(recipe)
args = getattr(recipe, "_args")
if ".source" not in args:
file, key, slug = getattr(self._recipe_now, "_args")[".source"]
args[".source"] = (file, key + "-sub", slug)
def prepare(self, head: "Head") -> List[List]:
while self._to_process:
next_recipe = self._to_process.pop()
self._recipe_now = next_recipe
next_recipe.prepare(self, head)
for hard_need in self._hard_needs:
if hard_need not in self._dependencies:
raise ValueError(f"Hard need not satisfied (no entry): {hard_need}")
if not self._dependencies[hard_need]:
raise ValueError(f"Hard need not satisfied (empty): {hard_need}")
self._dependencies[self._make_depspec_tuple(".internal", "completed")] = set(
self.recipes
)
return list(toposort.toposort(self._dependencies))
def recipe_name_getter(c: typing.Type["Recipe"]) -> Optional[str]:
if hasattr(c, "_NAME"):
return c._NAME # type: ignore
return None
class Recipe:
def __init__(self, host: str, slug: str, args: dict, head: "Head"):
self._host = host
self._slug = slug
self._args = args
def get_host(self):
return self._host
def get_tweak(self, name: str, default: Any) -> Any:
dotname = f".{name}"
if dotname in self._args:
return self._args[dotname]
elif ".m" in self._args and name in self._args[".m"]:
return self._args[".m"][name]
else:
return default
def get_user(self, head: "Head") -> str:
user = self.get_tweak("user", head.souss[self._host]["user"])
assert isinstance(user, str)
return user
@classmethod
def from_menu(cls, host: str, slug: str, args: dict, head: "Head") -> "Recipe":
return cls(host, slug, args, head)
def prepare(self, preparation: Preparation, head: "Head") -> None:
preparation.needs("os-user", self.get_user(head))
# TODO(feature) allow merging per-task and per-menu tweaks
# TODO(feature) allow need/provide custom things, not just user-units
afters = self.get_tweak("needs", None)
if afters:
for after in afters:
if isinstance(after, list) and len(after) == 2:
# allow requesting custom needs
preparation.needs(after[0], after[1])
continue
if not isinstance(after, str):
raise ValueError("needs tweak should be list of strings or pairs.")
preparation.needs("user-unit", after)
befores = self.get_tweak("provides", None)
if befores:
if isinstance(befores, str):
preparation.provides("user-unit", befores)
else:
for before in befores:
if not isinstance(before, str):
raise ValueError("provides tweak should be list of strings.")
preparation.provides("user-unit", before)
async def cook(self, kitchen: "Kitchen") -> None:
raise NotImplementedError
def __str__(self):
cls = self.__class__
if hasattr(cls, "RECIPE_NAME"):
return (
f"{cls.RECIPE_NAME}({cls.__name__}) {self._slug} " # type: ignore
f"on {self._host} ({self._args})"
)
else:
return f"{cls.__name__} {self._slug} on {self._host} ({self._args})"

102
scone/head/secrets.py Normal file
View File

@ -0,0 +1,102 @@
from typing import Optional
import nacl
import nacl.utils
import secretstorage
from nacl.encoding import RawEncoder, URLSafeBase64Encoder
from nacl.secret import SecretBox
from scone.common.misc import eprint
class SecretAccess:
def __init__(self, restaurant_identifier: Optional[str]):
self.restaurant_identifier = restaurant_identifier
self.key: Optional[bytes] = None
def encrypt_bytes(self, data: bytes, encoder=RawEncoder) -> bytes:
box = SecretBox(self.key)
return box.encrypt(data, None, encoder)
def decrypt_bytes(self, data: bytes, encoder=RawEncoder) -> bytes:
box = SecretBox(self.key)
return box.decrypt(data, None, encoder)
def generate_new(self):
eprint("Generating a new freezer key...")
self.key = nacl.utils.random(SecretBox.KEY_SIZE)
key_b64 = URLSafeBase64Encoder.encode(self.key)
eprint("Your new key is: " + key_b64.decode())
eprint("Pretty please store it in a safe place!")
if not self.restaurant_identifier:
eprint("No RI; not saving to SS")
return
eprint("Attempting to save it to the secret service...")
eprint("(save it yourself anyway!)")
with secretstorage.dbus_init() as connection:
collection = secretstorage.get_default_collection(connection)
attributes = {
"application": "Scone",
"restaurant": self.restaurant_identifier,
}
items = list(collection.search_items(attributes))
if items:
eprint(
"Found secret sauce for this Restaurant already!"
" Will not overwrite."
)
else:
eprint("Storing secret sauce for this Restaurant...")
collection.create_item(
f"scone({self.restaurant_identifier}): secret sauce",
attributes,
key_b64,
)
eprint("OK!")
def get_existing(self):
if self.restaurant_identifier is not None:
self.key = self._try_dbus_auth(self.restaurant_identifier)
else:
self.key = self._try_manual_entry()
def _try_dbus_auth(self, restaurant_identifier: str) -> Optional[bytes]:
eprint("Trying D-Bus Secret Service")
try:
with secretstorage.dbus_init() as connection:
collection = secretstorage.get_default_collection(connection)
attributes = {
"application": "Scone",
"restaurant": restaurant_identifier,
}
items = list(collection.search_items(attributes))
if items:
eprint("Found secret sauce for this Restaurant, unlocking…")
items[0].unlock()
return URLSafeBase64Encoder.decode(items[0].get_secret())
else:
eprint("Did not find secret sauce for this Restaurant.")
eprint("Enter it and I will try and store it...")
secret = self._try_manual_entry()
if secret is not None:
collection.create_item(
f"scone({restaurant_identifier}): secret sauce",
attributes,
URLSafeBase64Encoder.encode(secret),
)
return secret
return None
except EOFError: # XXX what happens with no D-Bus
return None
def _try_manual_entry(self) -> Optional[bytes]:
eprint("Manual entry required. Enter password for this restaurant: ", end="")
key = URLSafeBase64Encoder.decode(input().encode())
if len(key) != SecretBox.KEY_SIZE:
eprint("Wrong size!")
return None
else:
return key

63
scone/head/sshconn.py Normal file
View File

@ -0,0 +1,63 @@
import logging
from typing import Optional, Tuple
import asyncssh
from asyncssh import SSHClientConnection, SSHClientConnectionOptions, SSHClientProcess
from scone.common.chanpro import Channel, ChanPro
logger = logging.getLogger(__name__)
class AsyncSSHChanPro(ChanPro):
def __init__(self, connection: SSHClientConnection, process: SSHClientProcess):
super(AsyncSSHChanPro, self).__init__(process.stdout, process.stdin)
self._process = process
self._connection = connection
async def close(self) -> None:
await super(AsyncSSHChanPro, self).close()
await self._process.close()
await self._connection.close()
async def open_ssh_sous(
host: str,
user: str,
client_key: Optional[str],
requested_user: str,
sous_command: str,
debug_logging: bool = False
) -> Tuple[ChanPro, Channel]:
if client_key:
opts = SSHClientConnectionOptions(username=user, client_keys=[client_key])
else:
opts = SSHClientConnectionOptions(username=user)
conn: SSHClientConnection = await asyncssh.connect(host, options=opts)
if requested_user != user:
command = f"sudo -u {requested_user} {sous_command}"
else:
command = sous_command
if debug_logging:
command = f"tee /tmp/sconnyin-{requested_user} | {command} 2>/tmp/sconnyerr-{requested_user} " \
f"| tee /tmp/sconnyout-{requested_user}"
process: SSHClientProcess = await conn.create_process(command, encoding=None)
logger.debug("Constructing AsyncSSHChanPro...")
cp = AsyncSSHChanPro(conn, process)
logger.debug("Creating root channel...")
ch = cp.new_channel(number=0, desc="Root channel")
cp.start_listening_to_channels(default_route=None)
logger.debug("Sending head hello...")
await ch.send({"hello": "head"})
logger.debug("Waiting for sous hello...")
sous_hello = await ch.recv()
logger.debug("Got sous hello... checking")
assert isinstance(sous_hello, dict)
assert sous_hello["hello"] == "sous"
logger.debug("Valid sous hello...")
return cp, ch

27
scone/head/utils.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Any, Optional, Type, TypeVar, cast
from typeguard import check_type as check_typeguard
_A = TypeVar("_A")
def check_type(value: Any, check_type: Type[_A], name: str = "value") -> _A:
check_typeguard(name, value, check_type)
# if not isinstance(value, check_type):
# raise TypeError(f"Not of type {check_type}")
return cast(_A, value)
def check_type_opt(
value: Any, check_type: Type[_A], name: str = "value"
) -> Optional[_A]:
check_typeguard(name, value, Optional[check_type])
# if not isinstance(value, check_type):
# raise TypeError(f"Not of type {check_type}")
return cast(_A, value)
def check_type_adv(value: Any, check_type: Any, name: str = "value") -> Any:
# permitted to use special forms with this one
check_typeguard(name, value, check_type)
return value

169
scone/head/variables.py Normal file
View File

@ -0,0 +1,169 @@
from enum import Enum
from typing import Any, Dict, List, NamedTuple
ExpressionPart = NamedTuple("ExpressionPart", [("kind", str), ("value", str)])
def flatten_dict(nested: Dict[str, Any]) -> Dict[str, Any]:
for key in nested:
if not isinstance(key, str):
# not possible to flatten
return nested
flat = {}
for key, value in nested.items():
if isinstance(value, dict) and value:
sub_flat = flatten_dict(value)
for k in sub_flat:
if not isinstance(k, str):
flat[key] = value
break
else:
# can flatten
for k, v in sub_flat.items():
flat[f"{key}.{k}"] = v
else:
flat[key] = value
return flat
class ExprParsingState(Enum):
NORMAL = 1
DOLLAR = 2
VARIABLE_NAME = 3
def parse_expr(expr: str) -> List[ExpressionPart]:
state = ExprParsingState.NORMAL
buffer = ""
out = []
for char in expr:
if state == ExprParsingState.NORMAL:
if char == "$":
state = ExprParsingState.DOLLAR
else:
buffer += char
elif state == ExprParsingState.DOLLAR:
if char == "$":
# escaped dollar sign
buffer += "$"
state = ExprParsingState.NORMAL
elif char == "{":
state = ExprParsingState.VARIABLE_NAME
if buffer:
out.append(ExpressionPart("literal", buffer))
buffer = ""
else:
buffer += "$" + char
state = ExprParsingState.NORMAL
elif state == ExprParsingState.VARIABLE_NAME:
if char == "}":
state = ExprParsingState.NORMAL
out.append(ExpressionPart("variable", buffer))
buffer = ""
else:
buffer += char
if state != ExprParsingState.NORMAL:
raise ValueError(f"Wrong end state: {state}")
if buffer:
out.append(ExpressionPart("literal", buffer))
return out
def merge_right_into_left_inplace(left: dict, right: dict):
for key, value in right.items():
if isinstance(value, dict) and key in left and isinstance(left[key], dict):
merge_right_into_left_inplace(left[key], value)
else:
left[key] = value
class Variables:
def __init__(self):
self._vars: Dict[str, Any] = {}
def get_dotted(self, name: str) -> Any:
current = self._vars
keys = name.split(".")
try:
for k in keys:
current = current[k]
return current
except KeyError:
raise KeyError("No variable: " + name)
def has_dotted(self, name: str) -> bool:
try:
self.get_dotted(name)
return True
except KeyError:
return False
def set_dotted(self, name: str, value: Any):
current = self._vars
keys = name.split(".")
for k in keys[:-1]:
if k not in current:
current[k] = {}
current = current[k]
current[keys[-1]] = value
def _eval_with_incoming(self, expr: str, incoming: Dict[str, str]) -> Any:
parsed = parse_expr(expr)
if len(parsed) == 1 and parsed[0].kind == "variable":
var_name = parsed[0].value
if self.has_dotted(var_name):
return self.get_dotted(var_name)
elif var_name in incoming:
sub_expr = incoming.pop(var_name)
sub_val = self._eval_with_incoming(sub_expr, incoming)
self.set_dotted(var_name, sub_val)
return sub_val
else:
raise KeyError(f"No variable '{incoming}'")
out = ""
for part in parsed:
if part.kind == "literal":
out += part.value
elif part.kind == "variable":
var_name = parsed[0].value
if self.has_dotted(var_name):
out += str(self.get_dotted(var_name))
elif var_name in incoming:
sub_expr = incoming.pop(var_name)
sub_val = self._eval_with_incoming(sub_expr, incoming)
self.set_dotted(var_name, sub_val)
out += str(sub_val)
else:
raise KeyError(f"No variable '{incoming}'")
return out
def load_vars_with_substitutions(self, incoming: Dict[str, Any]):
incoming = flatten_dict(incoming)
while incoming:
key, expr = incoming.popitem()
value = self._eval_with_incoming(expr, incoming)
self.set_dotted(key, value)
def eval(self, expr: str) -> Any:
return self._eval_with_incoming(expr, {})
def load_plain(self, incoming: Dict[str, Any]):
merge_right_into_left_inplace(self._vars, incoming)
def substitute_inplace_in_dict(self, dictionary: Dict[str, Any]):
for k, v in dictionary.items():
if isinstance(v, dict):
self.substitute_inplace_in_dict(v)
elif isinstance(v, str):
dictionary[k] = self.eval(v)
def toplevel(self):
return self._vars

27
scone/sous/__init__.py Normal file
View File

@ -0,0 +1,27 @@
from os import path
import toml
from scone.common.loader import ClassLoader
from scone.sous.utensils import Utensil, utensil_namer
class Sous:
def __init__(self, ut_loader: ClassLoader[Utensil]):
self.utensil_loader = ut_loader
@staticmethod
def open(directory: str):
with open(path.join(directory, "scone.sous.toml")) as sous_toml:
sous_data = toml.load(sous_toml)
utensil_module_roots = sous_data.get(
"utensil_roots", ["scone.default.utensils"]
)
# load available recipes
loader: ClassLoader[Utensil] = ClassLoader(Utensil, utensil_namer)
for package_root in utensil_module_roots:
loader.add_package_root(package_root)
return Sous(loader)

101
scone/sous/__main__.py Normal file
View File

@ -0,0 +1,101 @@
import asyncio
import logging
import os
import pwd
import sys
from pathlib import Path
from typing import List, cast
import cattr
from scone.common.chanpro import Channel, ChanPro
from scone.common.pools import Pools
from scone.sous import Sous, Utensil
from scone.sous.utensils import Worktop
logger = logging.getLogger(__name__)
async def main(args: List[str]):
# loop = asyncio.get_event_loop()
# reader = asyncio.StreamReader()
# reader_protocol = asyncio.StreamReaderProtocol(reader)
# await loop.connect_read_pipe(lambda: reader_protocol, sys.stdin.buffer)
#
# writer_transport, writer_protocol = await loop.connect_write_pipe(
# FlowControlMixin, sys.stdout.buffer)
# writer = StreamWriter(writer_transport, writer_protocol, None, loop)
# data = await reader.readexactly(5)
#
# writer.write(data)
# await writer.drain()
logging.basicConfig(level=logging.DEBUG)
if len(args) < 1:
raise RuntimeError("Needs to be passed a sous config directory as 1st arg!")
sous = Sous.open(args[0])
logger.debug("Sous created")
cp = await ChanPro.open_from_stdio()
root = cp.new_channel(0, "Root channel")
cp.start_listening_to_channels(default_route=root)
await root.send({"hello": "sous"})
remote_hello = await root.recv()
assert isinstance(remote_hello, dict)
assert remote_hello["hello"] == "head"
sous_user = pwd.getpwuid(os.getuid()).pw_name
quasi_pers = Path(args[0], "worktop", sous_user)
if not quasi_pers.exists():
quasi_pers.mkdir(parents=True)
worktop = Worktop(quasi_pers, Pools())
logger.info("Worktop dir is: %s", worktop.dir)
while True:
try:
message = await root.recv()
except EOFError:
break
if "nc" in message:
# start a new command channel
channel_num = message["nc"]
command = message["cmd"]
payload = message["pay"]
utensil_class = sous.utensil_loader.get_class(command)
utensil = cast(Utensil, cattr.structure(payload, utensil_class))
channel = cp.new_channel(channel_num, command)
logger.debug("going to sched task with %r", utensil)
asyncio.create_task(run_utensil(utensil, channel, worktop))
elif "lost" in message:
# for a then-non-existent channel, but probably just waiting on us
# retry without a default route.
await cp.handle_incoming_message(message["lost"])
else:
raise RuntimeError(f"Unknown ch0 message {message}")
async def run_utensil(utensil: Utensil, channel: Channel, worktop: Worktop):
try:
await utensil.execute(channel, worktop)
except Exception:
logger.error("Unhandled Exception in Utensil", exc_info=True)
await channel.close("Exception in utensil")
else:
logger.debug("Utensil finished with normal reason")
await channel.close("Utensil complete")
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main(sys.argv[1:]))

26
scone/sous/utensils.py Normal file
View File

@ -0,0 +1,26 @@
from pathlib import Path
from typing import Type, TypeVar
from scone.common.chanpro import Channel
from scone.common.pools import Pools
T = TypeVar("T")
class Worktop:
def __init__(self, dir: Path, pools: Pools):
# mostly-persistent worktop space for utensils
self.dir = dir
self.pools = pools
class Utensil:
def __init__(self):
pass
async def execute(self, channel: Channel, worktop: Worktop):
raise NotImplementedError
def utensil_namer(c: Type) -> str:
return f"{c.__module__}.{c.__name__}"

18
scripts-dev/lint.sh Executable file
View File

@ -0,0 +1,18 @@
#!/bin/sh -eu
if [ $# -ge 1 ]
then
files=$*
else
files="scone tests"
fi
echo "Linting these locations: $files"
echo " ===== Running isort ===== "
isort $files
echo " ===== Running black ===== "
black $files
echo " ===== Running flake8 ===== "
flake8 $files
echo " ===== Running mypy ===== "
mypy $files

21
setup.cfg Normal file
View File

@ -0,0 +1,21 @@
[flake8]
# line length defaulted to by black
max-line-length = 88
# see https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes
# for error codes. The ones we ignore are:
# W503: line break before binary operator
# W504: line break after binary operator
# E203: whitespace before ':' (which is contrary to pep8?)
# (this is a subset of those ignored in Synapse)
ignore=W503,W504,E203
[isort]
line_length = 88
sections=FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,TESTS,LOCALFOLDER
default_section=THIRDPARTY
known_first_party=scone
known_tests=tests
multi_line_output=3
include_trailing_comma=true
combine_as_imports=true

158
setup.py Normal file
View File

@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note: To use the 'upload' functionality of this file, you must:
# $ pipenv install twine --dev
import io
import os
import sys
from shutil import rmtree
from setuptools import find_packages, setup, Command
# Package meta-data.
NAME = 'scone'
DESCRIPTION = 'Simple CONfiguration Engine'
URL = 'https://librepush.net/project/scone'
EMAIL = 'rei@librepush.net'
AUTHOR = 'Olivier \'reivilibre\''
REQUIRES_PYTHON = '>=3.7.0'
VERSION = '0.1.0'
# What packages are required for this module to be executed?
REQUIRED = [
"cbor2~=5.1.1",
"setuptools~=49.1.2",
"toml~=0.10.1",
"attrs~=19.3.0",
"cattrs~=1.0.0",
"canonicaljson~=1.2.0"
]
EX_SOUS_BASE = []
EX_SOUS_PG = ["asyncpg"]
EX_SOUS_ALL = EX_SOUS_BASE + EX_SOUS_PG
# What packages are optional?
EXTRAS = {
"head": [
"SecretStorage~=3.1.2",
"asyncssh[libnacl]~=2.2.1",
"toposort~=1.5",
"pynacl",
"aiosqlite~=0.15.0",
"requests",
"Jinja2",
"typeguard"
],
"sous": EX_SOUS_ALL,
"sous-core": EX_SOUS_BASE,
"sous-pg": EX_SOUS_PG
}
# The rest you shouldn't have to touch too much :)
# ------------------------------------------------
# Except, perhaps the License and Trove Classifiers!
# If you do change the License, remember to change the Trove Classifier for that!
here = os.path.abspath(os.path.dirname(__file__))
# Import the README and use it as the long-description.
# Note: this will only work if 'README.md' is present in your MANIFEST.in file!
try:
with io.open(os.path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = '\n' + f.read()
except FileNotFoundError:
long_description = DESCRIPTION
# Load the package's __version__.py module as a dictionary.
about = {}
if not VERSION:
project_slug = NAME.lower().replace("-", "_").replace(" ", "_")
with open(os.path.join(here, project_slug, '__version__.py')) as f:
exec(f.read(), about)
else:
about['__version__'] = VERSION
class UploadCommand(Command):
"""Support setup.py upload."""
description = 'Build and publish the package.'
user_options = []
@staticmethod
def status(s):
"""Prints things in bold."""
print('\033[1m{0}\033[0m'.format(s))
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
try:
self.status('Removing previous builds…')
rmtree(os.path.join(here, 'dist'))
except OSError:
pass
self.status('Building Source and Wheel (universal) distribution…')
os.system('{0} setup.py sdist bdist_wheel --universal'.format(sys.executable))
self.status('Uploading the package to PyPI via Twine…')
os.system('twine upload dist/*')
self.status('Pushing git tags…')
os.system('git tag v{0}'.format(about['__version__']))
os.system('git push --tags')
sys.exit()
# Where the magic happens:
setup(
name=NAME,
version=about['__version__'],
description=DESCRIPTION,
long_description=long_description,
long_description_content_type='text/markdown',
author=AUTHOR,
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url=URL,
packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]),
# If your package is a single module, use this instead of 'packages':
# py_modules=['mypackage'],
entry_points={
'console_scripts': [
'scone=scone.head.cli:cli',
'scone-freezer=scone.head.cli.freezer:cli'
],
},
install_requires=REQUIRED,
extras_require=EXTRAS,
include_package_data=True,
# TODO license='GPL3',
classifiers=[
# Trove classifiers
# Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy'
],
# $ setup.py publish support.
cmdclass={
'upload': UploadCommand,
},
)

34
tox.ini Normal file
View File

@ -0,0 +1,34 @@
[tox]
envlist = py37, check_codestyle, check_types
[testenv]
# As of twisted 16.4, trial tries to import the tests as a package (previously
# it loaded the files explicitly), which means they need to be on the
# pythonpath. Our sdist doesn't include the 'tests' package, so normally it
# doesn't work within the tox virtualenv.
#
# As a workaround, we tell tox to do install with 'pip -e', which just
# creates a symlink to the project directory instead of unpacking the sdist.
usedevelop=true
commands = trial tests
[testenv:check_codestyle]
skip_install = True
deps =
# pin deps so we don't start failing when they are updated
flake8==3.8.3
black==19.10b0
isort~=5.0
commands =
flake8 .
black --check --diff .
isort --check-only --diff .
[testenv:check_types]
deps =
mypy==0.780
mypy-zope==0.2.7
commands =
mypy .