Misc improvements and cleanup (#11)

* Improve and quieten logging

* Remove some debug printing

* Add TODO to systemd

* Add michelin CLI program to help composing menus

* Try to harden apt against locking issues

Not too successfully still. :(

* Improve supermarket downloading

* Make HashFile give None if file does not exist

* Fix grammar to support edges properly

* Make a first pass at bringing up explicit edge support

* Add michelin as a command

* Antilint
This commit is contained in:
reivilibre 2020-11-01 19:37:49 +00:00 committed by GitHub
parent 6c5c14a58b
commit 4cec0d998e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 359 additions and 114 deletions

View File

@ -74,7 +74,7 @@ class ChanPro:
idx = 0
while True:
message = await self._recv_dict()
logger.debug("<message> %d %r", idx, message)
# logger.debug("<message> %d %r", idx, message)
idx += 1
await self.handle_incoming_message(message, default_route=default_route)

View File

@ -1,4 +1,6 @@
from typing import Dict, List, Set, Tuple
import asyncio
import logging
from typing import List
from scone.default.utensils.basic_utensils import SimpleExec
from scone.head.head import Head
@ -6,66 +8,98 @@ from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
logger = logging.getLogger(__name__)
class AptInstallInternal(Recipe):
"""
Actually installs the packages; does it in a single batch for efficiency!
"""
_NAME = "apt-install.internal"
# TODO(extension, low): expand this into apt-install-now if we need
# the flexibility
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.packages: Set[str] = set()
args["packages"] = self.packages
args[".source"] = ("@virtual", "apt-install-internal", "the one true AII")
def get_user(self, head: "Head") -> str:
return "root"
def prepare(self, preparation: Preparation, head: "Head") -> None:
super().prepare(preparation, head)
preparation.needs("apt-stage", "internal-install-packages")
preparation.needs("apt-stage", "repositories-declared")
preparation.provides("apt-stage", "packages-installed")
async def cook(self, kitchen: Kitchen) -> None:
# apt-installs built up the args to represent what was needed, so this
# will work as-is
kitchen.get_dependency_tracker()
if self.packages:
update = await kitchen.ut1areq(
SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result
)
if update.exit_code != 0:
raise RuntimeError(
f"apt update failed with err {update.exit_code}: {update.stderr!r}"
)
install_args = ["apt-get", "-yq", "install"]
install_args += list(self.packages)
install = await kitchen.ut1areq(
SimpleExec(install_args, "/"), SimpleExec.Result
)
if install.exit_code != 0:
raise RuntimeError(
f"apt install failed with err {install.exit_code}:"
f" {install.stderr!r}"
)
# class AptInstallInternal(Recipe):
# """
# Actually installs the packages; does it in a single batch for efficiency!
# """
#
# _NAME = "apt-install.internal"
#
# # TODO(extension, low): expand this into apt-install-now if we need
# # the flexibility
#
# def __init__(self, recipe_context: RecipeContext, args: dict, head):
# super().__init__(recipe_context, args, head)
#
# self.packages: Set[str] = set()
#
# args["packages"] = self.packages
# args[".source"] = ("@virtual", "apt-install-internal", "the one true AII")
#
# def get_user(self, head: "Head") -> str:
# return "root"
#
# def prepare(self, preparation: Preparation, head: "Head") -> None:
# super().prepare(preparation, head)
# preparation.needs("apt-stage", "internal-install-packages")
# preparation.needs("apt-stage", "repositories-declared")
# preparation.provides("apt-stage", "packages-installed")
#
# async def _apt_command(self, kitchen: Kitchen, args: List[str]) ->
# SimpleExec.Result:
# # lock_path = "/var/lib/apt/lists/lock"
# lock_path = "/var/lib/dpkg/lock"
#
# retries = 3
#
# while retries > 0:
# result = await kitchen.ut1areq(
# SimpleExec(args, "/"), SimpleExec.Result
# )
#
# if result.exit_code == 0 or b"/lock" not in result.stderr:
# return result
#
# logger.
#
# retries -= 1
#
# # /lock seen in stderr, probably a locking issue...
# lock_check = await kitchen.ut1areq(SimpleExec(
# ["fuser", lock_path],
# "/"
# ), SimpleExec.Result)
#
# if lock_check.exit_code != 0:
# # non-zero code means the file is not being accessed;
# # use up a retry (N.B. we retry because this could be racy...)
# retries -= 1
#
# await asyncio.sleep(2.0)
#
# return result # noqa
#
#
# async def cook(self, kitchen: Kitchen) -> None:
# # apt-installs built up the args to represent what was needed, so this
# # will work as-is
# kitchen.get_dependency_tracker()
#
# if self.packages:
# update = await self._apt_command(kitchen, ["apt-get", "-yq", "update"])
# if update.exit_code != 0:
# raise RuntimeError(
# f"apt update failed with err {update.exit_code}:
# {update.stderr!r}"
# )
#
# install_args = ["apt-get", "-yq", "install"]
# install_args += list(self.packages)
# install = await self._apt_command(kitchen, install_args)
#
# if install.exit_code != 0:
# raise RuntimeError(
# f"apt install failed with err {install.exit_code}:"
# f" {install.stderr!r}"
# )
class AptPackage(Recipe):
_NAME = "apt-install"
internal_installers: Dict[Tuple[Head, str], AptInstallInternal] = {}
def __init__(self, recipe_context: RecipeContext, args: dict, head):
super().__init__(recipe_context, args, head)
self.packages: List[str] = check_type(args["packages"], list)
@ -76,14 +110,49 @@ class AptPackage(Recipe):
for package in self.packages:
preparation.provides("apt-package", package)
async def _apt_command(
self, kitchen: Kitchen, args: List[str]
) -> SimpleExec.Result:
retries = 3
while retries > 0:
result = await kitchen.ut1areq(SimpleExec(args, "/"), SimpleExec.Result)
if result.exit_code == 0 or b"/lock" not in result.stderr:
return result
logger.warning(
"Failed apt command due to suspected locking issue. Will retry…"
)
retries -= 1
# /lock seen in stderr, probably a locking issue...
lock_check = await kitchen.ut1areq(
SimpleExec(
["fuser", "/var/lib/dpkg/lock", "/var/lib/apt/lists/lock"], "/"
),
SimpleExec.Result,
)
if lock_check.exit_code != 0:
# non-zero code means the file is not being accessed;
# use up a retry (N.B. we retry because this could be racy...)
logger.warning(
"Suspected locking issue is either racy or a red herring."
)
retries -= 1
await asyncio.sleep(2.0)
return result # noqa
async def cook(self, kitchen: Kitchen) -> None:
# this is a one-off task assuming everything works
kitchen.get_dependency_tracker()
if self.packages:
update = await kitchen.ut1areq(
SimpleExec(["apt-get", "-yq", "update"], "/"), SimpleExec.Result
)
update = await self._apt_command(kitchen, ["apt-get", "-yq", "update"])
if update.exit_code != 0:
raise RuntimeError(
f"apt update failed with err {update.exit_code}: {update.stderr!r}"
@ -91,9 +160,7 @@ class AptPackage(Recipe):
install_args = ["apt-get", "-yq", "install"]
install_args += list(self.packages)
install = await kitchen.ut1areq(
SimpleExec(install_args, "/"), SimpleExec.Result
)
install = await self._apt_command(kitchen, install_args)
if install.exit_code != 0:
raise RuntimeError(

View File

@ -1,9 +1,12 @@
import asyncio
import logging
import os
from asyncio import Future
from pathlib import Path
from typing import Dict, cast
from urllib.parse import urlparse
from urllib.request import urlretrieve
import requests
from scone.common.misc import sha256_file
from scone.common.modeutils import DEFAULT_MODE_FILE, parse_mode
@ -13,12 +16,14 @@ from scone.default.steps.fridge_steps import (
FridgeMetadata,
load_and_transform,
)
from scone.default.utensils.basic_utensils import Chown, WriteFile
from scone.default.utensils.basic_utensils import Chmod, Chown, HashFile, WriteFile
from scone.head.head import Head
from scone.head.kitchen import Kitchen, Preparation
from scone.head.recipe import Recipe, RecipeContext
from scone.head.utils import check_type
logger = logging.getLogger(__name__)
class FridgeCopy(Recipe):
"""
@ -118,6 +123,7 @@ class Supermarket(Recipe):
def prepare(self, preparation: Preparation, head: "Head"):
super().prepare(preparation, head)
preparation.provides("file", str(self.destination))
preparation.needs("directory", str(self.destination.parent))
async def cook(self, kitchen: "Kitchen"):
# need to ensure we download only once, even in a race…
@ -126,48 +132,82 @@ class Supermarket(Recipe):
kitchen.head.directory, SUPERMARKET_RELATIVE, self.sha256
)
if self.sha256 in Supermarket.in_progress:
await Supermarket.in_progress[self.sha256]
elif not supermarket_path.exists():
note = f"""
Scone Supermarket
logger.debug("Going to hash …")
This file corresponds to {self.url}
remote_hash = await kitchen.ut1(HashFile(str(self.destination)))
Downloaded by {self}
""".strip()
logger.debug(
"sha256 of %s: want %s have %r", self.destination, self.sha256, remote_hash
)
Supermarket.in_progress[self.sha256] = cast(
Future,
asyncio.get_running_loop().run_in_executor(
kitchen.head.pools.threaded,
self._download_file,
self.url,
str(supermarket_path),
self.sha256,
note,
),
)
if remote_hash != self.sha256:
if self.sha256 in Supermarket.in_progress:
logger.debug("Awaiting existing download")
await Supermarket.in_progress[self.sha256]
elif not supermarket_path.exists():
note = f"""
Scone Supermarket
# TODO(perf): load file in another thread
with open(supermarket_path, "r") as fin:
data = fin.read()
chan = await kitchen.start(WriteFile(str(self.destination), self.mode))
await chan.send(data)
await chan.send(None)
if await chan.recv() != "OK":
raise RuntimeError(f"WriteFail failed on supermarket to {self.destination}")
This file corresponds to {self.url}
Downloaded by {self}
""".strip()
Supermarket.in_progress[self.sha256] = cast(
Future,
asyncio.get_running_loop().run_in_executor(
kitchen.head.pools.threaded,
self._download_file,
self.url,
str(supermarket_path),
self.sha256,
note,
),
)
logger.debug("Awaiting new download")
await Supermarket.in_progress[self.sha256]
else:
logger.debug("Already in supermarket.")
# TODO(perf): load file in another thread
# TODO(perf): chunk file
with open(supermarket_path, "rb") as fin:
data = fin.read()
chan = await kitchen.start(WriteFile(str(self.destination), self.mode))
await chan.send(data)
await chan.send(None)
if await chan.recv() != "OK":
raise RuntimeError(
f"WriteFail failed on supermarket to {self.destination}"
)
await kitchen.ut0(Chown(str(self.destination), self.owner, self.group))
await kitchen.ut0(Chmod(str(self.destination), self.mode))
@staticmethod
def _download_file(url: str, dest_path: str, check_sha256: str, note: str):
urlretrieve(url, dest_path)
Path(dest_path).parent.mkdir(parents=True, exist_ok=True)
r = requests.get(url, stream=True)
with open(dest_path, "wb") as fp:
for chunk in r.iter_content(4 * 1024 * 1024):
fp.write(chunk)
real_sha256 = sha256_file(dest_path)
if real_sha256 != check_sha256:
try:
os.rename(dest_path, dest_path + ".bad")
except Exception:
try:
os.unlink(dest_path)
except Exception:
pass
raise RuntimeError(
f"sha256 hash mismatch {real_sha256} != {check_sha256} (wanted)"
)
with open(dest_path + ".txt", "w") as fout:
# leave a note so we can find out what this is if we need to.
fout.write(note)

View File

@ -14,6 +14,8 @@ class SystemdUnit(Recipe):
System unit.
TODO(performance): make it collapsible in a way so that it can daemon-reload
only once in most situations.
TODO(performance): deduplication.
"""
_NAME = "systemd"

View File

@ -96,5 +96,4 @@ async def load_and_transform(
# except Exception:
# template.environment.handle_exception()
print("data", fullpath, data)
return data

View File

@ -143,7 +143,10 @@ class HashFile(Utensil):
path: str
async def execute(self, channel: Channel, worktop: Worktop):
sha256 = await asyncio.get_running_loop().run_in_executor(
worktop.pools.threaded, sha256_file, self.path
)
await channel.send(sha256)
try:
sha256 = await asyncio.get_running_loop().run_in_executor(
worktop.pools.threaded, sha256_file, self.path
)
await channel.send(sha256)
except FileNotFoundError:
await channel.send(None)

View File

@ -1,6 +1,16 @@
import asyncio
import os
import shutil
import sys
import tempfile
from argparse import ArgumentParser
from os.path import join
from pathlib import Path
import requests
import toml
from scone.common.misc import eprint, sha256_file
def cli() -> None:
@ -19,8 +29,60 @@ async def cli_async() -> int:
supermarket.set_defaults(func=supermarket_cli)
argp = parser.parse_args(args)
return await argp.func(argp)
if not hasattr(argp, "func"):
parser.print_help()
return 127
cdir = Path(os.getcwd())
while not Path(cdir, "scone.head.toml").exists():
cdir = cdir.parent
if len(cdir.parts) <= 1:
eprint("Don't appear to be in a head. STOP.")
sys.exit(1)
with open(join(cdir, "scone.head.toml")) as head_toml:
head_data = toml.load(head_toml)
return await argp.func(argp, head_data, cdir)
async def supermarket_cli(argp) -> int:
return 0 # TODO
async def supermarket_cli(argp, head_data: dict, head_dir: Path) -> int:
eprint("Want to download", argp.url)
r = requests.get(argp.url, stream=True)
with tempfile.NamedTemporaryFile(delete=False) as tfp:
filename = tfp.name
for chunk in r.iter_content(4 * 1024 * 1024):
tfp.write(chunk)
eprint("Hashing", filename)
real_sha256 = sha256_file(filename)
note = f"""
Scone Supermarket
This file corresponds to {argp.url}
Downloaded by michelin.
""".strip()
target_path = Path(head_dir, ".scone-cache", "supermarket", real_sha256)
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(filename, str(target_path))
with open(str(target_path) + ".txt", "w") as fout:
# leave a note so we can find out what this is if we need to.
fout.write(note)
print("[[supermarket]]")
print(f'url = "{argp.url}"')
print(f'sha256 = "{real_sha256}"')
print("dest = ")
print("#owner = bob")
print("#group = laura")
print('#mode = "ug=rw,o=r"')
return 0

View File

@ -60,6 +60,7 @@ ResourceEdgeDirectiveKind:
ResourceEdgeDirective[ws=' \t']:
kind=ResourceEdgeDirectiveKind
resource=Resource
/\n/+
;
RecipeEdgeDirectiveKind:
@ -70,6 +71,7 @@ RecipeEdgeDirective[ws=' \t']:
kind=RecipeEdgeDirectiveKind
':' id=ID
// TODO 'on other sous' ?
/\n/+
;
ListenEdgeDirectiveKind:

View File

@ -1,9 +1,9 @@
import logging
import os
import typing
from collections import defaultdict
from collections import defaultdict, deque
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Deque, Dict, Iterable, List, Optional, Tuple, Union
import attr
import textx
@ -57,7 +57,7 @@ class ResourceEdgeDirective:
resource: Resource
@attr.s(auto_attribs=True)
@attr.s(auto_attribs=True, eq=False)
class MenuBlock:
id: Optional[None]
@ -138,6 +138,16 @@ def convert_textx_recipe(txrecipe_or_subblock, parent: Optional[MenuBlock]):
recipe.user_directive = directive.user
elif isinstance(directive, scoml_classes["SousDirective"]):
recipe.user_directive = directive.sous
elif isinstance(directive, scoml_classes["ResourceEdgeDirective"]):
recipe.resource_edges.append(
ResourceEdgeDirective(
directive.kind[1:], convert_textx_resource(directive.resource)
)
)
elif isinstance(directive, scoml_classes["RecipeEdgeDirective"]):
recipe.recipe_edges.append(
RecipeEdgeDirective(directive.kind[1:], directive.id)
)
else:
raise ValueError(f"Unknown directive {directive}")
@ -238,6 +248,28 @@ class MenuLoader:
"""
# TODO(feature): need to think about scoping rules and then figure
# this one out
# TEMPORARY, UNSTABLE TODO(stabilise) resolution rules
# need to consider resolution between files, and IDless resolution
# get the root ancestor of the referrer
a: Union[MenuBlock, MenuRecipe] = referrer
while a.parent is not None:
a = a.parent
to_visit: Deque[Union[MenuBlock, MenuRecipe]] = deque()
to_visit.append(a)
while to_visit:
next_node = to_visit.popleft()
if next_node.id == reference:
return next_node
if isinstance(next_node, MenuBlock):
for child in next_node.contents:
to_visit.append(child)
return None
def _get_first_common_ancestor(
@ -389,6 +421,8 @@ class MenuLoader:
fors: Tuple[ForDirective, ...],
applicable_souss: Iterable[str],
):
# TODO(feature): add edges
# add fors
fors = fors + tuple(recipe.for_directives)
@ -400,6 +434,36 @@ class MenuLoader:
for _vars, for_indices in self._for_apply(fors, sous_vars, tuple()):
instance = self._recipes[recipe][(sous, for_indices)] # noqa
for recipe_edge in recipe.recipe_edges:
target = self.resolve_ref(recipe, recipe_edge.recipe_id)
if isinstance(target, MenuBlock):
# TODO(feature)
raise NotImplementedError(
"@after/@before on block is not yet here sadly"
)
elif isinstance(target, MenuRecipe):
for target_instance in self.get_related_instances(
sous, for_indices, recipe, target
):
if recipe_edge.kind == "after":
self._dag.add_ordering(target_instance, instance)
elif recipe_edge.kind == "before":
self._dag.add_ordering(instance, target_instance)
for resource_edge in recipe.resource_edges:
resource = resource_edge.resource
if resource.sous == "(self)":
resource = attr.evolve(resource, sous=sous)
if resource_edge.kind == "needs":
self._dag.needs(instance, resource)
elif resource_edge.kind == "wants":
self._dag.needs(instance, resource, soft_wants=True)
elif resource_edge.kind == "provides":
self._dag.provides(instance, resource)
# XXX apply specific edges here including those from parent
def postdagify_block(
@ -410,6 +474,8 @@ class MenuLoader:
):
# XXX pass down specific edges here
# TODO(feature): add edges
fors = fors + tuple(block.for_directives)
if block.sous_directive:

View File

@ -34,6 +34,7 @@ async def open_ssh_sous(
else:
opts = SSHClientConnectionOptions(username=user)
logger.debug("Connecting to %s[%s]@%s over SSH...", user, requested_user, host)
conn: SSHClientConnection = await asyncssh.connect(host, options=opts)
if requested_user != user:
@ -50,17 +51,12 @@ async def open_ssh_sous(
process: SSHClientProcess = await conn.create_process(command, encoding=None)
logger.debug("Constructing AsyncSSHChanPro...")
cp = AsyncSSHChanPro(conn, process)
logger.debug("Creating root channel...")
ch = cp.new_channel(number=0, desc="Root channel")
cp.start_listening_to_channels(default_route=None)
logger.debug("Sending head hello...")
await ch.send({"hello": "head"})
logger.debug("Waiting for sous hello...")
logger.debug("Waiting for sous hello from %s[%s]@%s...", user, requested_user, host)
sous_hello = await ch.recv()
logger.debug("Got sous hello... checking")
assert isinstance(sous_hello, dict)
assert sous_hello["hello"] == "sous"
logger.debug("Valid sous hello...")
return cp, ch

View File

@ -5,7 +5,9 @@ from typing import Any, Dict, List, NamedTuple, Optional, Set
ExpressionPart = NamedTuple("ExpressionPart", [("kind", str), ("value", str)])
def flatten_dict(nested: Dict[str, Any]) -> Dict[str, Any]:
def flatten_dict(
nested: Dict[str, Any], discard_empty_dicts: bool = False
) -> Dict[str, Any]:
for key in nested:
if not isinstance(key, str):
# not possible to flatten
@ -14,7 +16,7 @@ def flatten_dict(nested: Dict[str, Any]) -> Dict[str, Any]:
flat = {}
for key, value in nested.items():
if isinstance(value, dict) and value:
if isinstance(value, dict) and (discard_empty_dicts or value):
sub_flat = flatten_dict(value)
for k in sub_flat:
if not isinstance(k, str):
@ -81,7 +83,7 @@ def merge_right_into_left_inplace(left: dict, right: dict):
if isinstance(value, dict) and key in left and isinstance(left[key], dict):
merge_right_into_left_inplace(left[key], value)
else:
left[key] = value
left[key] = deepcopy(value)
class Variables:
@ -94,6 +96,8 @@ class Variables:
keys = name.split(".")
try:
for k in keys:
if not isinstance(current, dict):
raise ValueError(f"non-dictionary encountered when getting {name}")
current = current[k]
return current
except KeyError:
@ -152,10 +156,13 @@ class Variables:
return out
def load_vars_with_substitutions(self, incoming: Dict[str, Any]):
incoming = flatten_dict(incoming)
incoming = flatten_dict(incoming, discard_empty_dicts=True)
while incoming:
key, expr = incoming.popitem()
value = self._eval_with_incoming(expr, incoming)
if isinstance(expr, str):
value = self._eval_with_incoming(expr, incoming)
else:
value = expr
self.set_dotted(key, value)
def eval(self, expr: str) -> Any:

View File

@ -136,7 +136,8 @@ setup(
entry_points={
'console_scripts': [
'scone=scone.head.cli:cli',
'scone-freezer=scone.head.cli.freezer:cli'
'scone-freezer=scone.head.cli.freezer:cli',
'michelin=scone.head.cli.michelin:cli'
],
},
install_requires=REQUIRED,