Experimental deduplication support
This commit is contained in:
parent
dc5a5b9909
commit
9173c75a7f
@ -20,7 +20,7 @@ import logging
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from hashlib import sha256
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import aiosqlite
|
||||
import attr
|
||||
@ -81,6 +81,9 @@ class DependencyBook:
|
||||
cache_data: Dict[str, Any] = dict()
|
||||
ignored: bool = False
|
||||
|
||||
var_list: List[str] = list()
|
||||
varhash: str = ""
|
||||
|
||||
# TODO(performance, feature): track more in-depth details, perhaps as a
|
||||
# per-resource cache thing, so that we can track the info needed to know
|
||||
# if it changed...?
|
||||
@ -92,6 +95,8 @@ class DependencyBook:
|
||||
"last_changed": self.last_changed,
|
||||
"cache_data": self.cache_data,
|
||||
"ignored": self.ignored,
|
||||
"var_list": self.var_list,
|
||||
"varhash": self.varhash,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@ -105,6 +110,8 @@ class DependencyBook:
|
||||
last_changed=dictionary["last_changed"],
|
||||
cache_data=dictionary["cache_data"],
|
||||
ignored=dictionary["ignored"],
|
||||
var_list=dictionary["var_list"],
|
||||
varhash=dictionary["varhash"],
|
||||
)
|
||||
|
||||
|
||||
@ -118,27 +125,41 @@ cattr.global_converter.register_structure_hook(
|
||||
|
||||
class DependencyTracker:
|
||||
def __init__(self, book: DependencyBook, dag: "RecipeDag", recipe: "Recipe"):
|
||||
self.book: DependencyBook = book
|
||||
self._book: DependencyBook = book
|
||||
self._dag: "RecipeDag" = dag
|
||||
self._recipe: "Recipe" = recipe
|
||||
self._time: int = int(time.time() * 1000)
|
||||
|
||||
self._vars: Dict[str, Any] = dict()
|
||||
|
||||
def build_book(self) -> DependencyBook:
|
||||
self._book.varhash = hash_dict(self._vars)
|
||||
self._book.var_list = sorted(self._vars.keys())
|
||||
return self._book
|
||||
|
||||
def watch(self, resource: Resource) -> None:
|
||||
# XXX self.book.watching[resource] = self._dag.resource_time[resource]
|
||||
self.book.watching[resource] = -42
|
||||
try:
|
||||
self._book.watching[resource] = self._dag.resource_time[resource]
|
||||
except KeyError as ke:
|
||||
raise RuntimeError(
|
||||
f"Can't watch {resource!r} because it hasn't been provided (yet)!"
|
||||
) from ke
|
||||
|
||||
def provide(self, resource: Resource, time: Optional[int] = None) -> None:
|
||||
if time is None:
|
||||
time = self._time
|
||||
self._dag.resource_time[resource] = time
|
||||
# We use the maximum time because multiple recipes may provide something
|
||||
# and we should be careful to define a consistent behaviour in this case
|
||||
self._dag.resource_time[resource] = max(
|
||||
time, self._dag.resource_time.get(resource, -1)
|
||||
)
|
||||
|
||||
def ignore(self) -> None:
|
||||
self.book.ignored = True
|
||||
self._book.ignored = True
|
||||
|
||||
def register_variable(self, variable: str, value: Union[dict, str, int]):
|
||||
# self._vars[variable] = value
|
||||
# TODO(implement)
|
||||
logger.critical("not implemented: register var %s", variable)
|
||||
# store a copy and we'll read it later
|
||||
self._vars[variable] = value
|
||||
|
||||
def register_fridge_file(self, desugared_path: str):
|
||||
# TODO this is not complete
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from asyncio import Future, Queue
|
||||
from collections import deque
|
||||
from contextvars import ContextVar
|
||||
@ -33,6 +34,7 @@ from scone.head.dependency_tracking import (
|
||||
DependencyBook,
|
||||
DependencyCache,
|
||||
DependencyTracker,
|
||||
hash_dict,
|
||||
)
|
||||
from scone.head.head import Head
|
||||
from scone.head.recipe import Recipe
|
||||
@ -133,6 +135,7 @@ class Kitchen:
|
||||
self.last_updated_ats: Dict[Resource, int] = dict()
|
||||
self._cookable: Queue[Optional[Vertex]] = Queue()
|
||||
self._sleeper_slots: int = 0
|
||||
self._kitchen_time: int = int(1000 * time.time())
|
||||
|
||||
def get_dependency_tracker(self):
|
||||
return self._dependency_trackers[current_recipe.get()]
|
||||
@ -198,6 +201,45 @@ class Kitchen:
|
||||
|
||||
await asyncio.gather(*workers, return_exceptions=False)
|
||||
|
||||
async def _should_skip(
|
||||
self, recipe: Recipe
|
||||
) -> Tuple[Optional[DependencyBook], bool]:
|
||||
"""
|
||||
:param recipe: recipe to inquire about
|
||||
:return: dep book, or None if there wasn't one
|
||||
and true if the recipe should be skipped, false otherwise.
|
||||
"""
|
||||
inquiry = await self._dependency_store.inquire(recipe)
|
||||
if inquiry is None:
|
||||
return None, False
|
||||
_id, prev_book = inquiry
|
||||
|
||||
# ignored books are not valid...
|
||||
if prev_book.ignored:
|
||||
return prev_book, False
|
||||
|
||||
# compute and compare the var hash...
|
||||
sous_vars = self.head.variables[recipe.recipe_context.sous]
|
||||
vars_to_hash = {}
|
||||
for var in prev_book.var_list:
|
||||
vars_to_hash[var] = sous_vars.get_dotted(var)
|
||||
my_varhash = hash_dict(vars_to_hash)
|
||||
if prev_book.varhash != my_varhash:
|
||||
return prev_book, False
|
||||
|
||||
# compare watched resources...
|
||||
for resource, last_update_time in prev_book.watching.items():
|
||||
res_time = self.head.dag.resource_time.get(resource)
|
||||
if res_time is None:
|
||||
# suggests something has changed in a significant way...
|
||||
return prev_book, False
|
||||
|
||||
if res_time != last_update_time:
|
||||
# recipe is out of date
|
||||
return prev_book, False
|
||||
|
||||
return prev_book, True
|
||||
|
||||
async def _cooking_worker(self):
|
||||
dag = self.head.dag
|
||||
while True:
|
||||
@ -218,23 +260,34 @@ class Kitchen:
|
||||
if isinstance(next_job, Recipe):
|
||||
meta = dag.recipe_meta[next_job]
|
||||
|
||||
# TODO try to deduplicate
|
||||
meta.state = RecipeState.BEING_COOKED
|
||||
current_recipe.set(next_job)
|
||||
eprint(f"cooking {next_job}")
|
||||
self._dependency_trackers[next_job] = DependencyTracker(
|
||||
DependencyBook(), dag, next_job
|
||||
)
|
||||
try:
|
||||
await next_job.cook(self)
|
||||
except Exception as e:
|
||||
meta.state = RecipeState.FAILED
|
||||
raise RuntimeError(f"Recipe {next_job} failed!") from e
|
||||
eprint(f"cooked {next_job}")
|
||||
# TODO cook
|
||||
# TODO store depbook
|
||||
await self._store_dependency(next_job)
|
||||
meta.state = RecipeState.COOKED
|
||||
last_book, should_skip = await self._should_skip(next_job)
|
||||
if should_skip and last_book:
|
||||
meta.state = RecipeState.SKIPPED
|
||||
# provide stuff that it provided last time
|
||||
for res, last_update_time in last_book.provided.items():
|
||||
dag.resource_time[res] = max(
|
||||
last_update_time, dag.resource_time.get(res, -1)
|
||||
)
|
||||
else:
|
||||
meta.state = RecipeState.BEING_COOKED
|
||||
current_recipe.set(next_job)
|
||||
eprint(f"cooking {next_job}")
|
||||
tracker = DependencyTracker(DependencyBook(), dag, next_job)
|
||||
self._dependency_trackers[next_job] = tracker
|
||||
try:
|
||||
await next_job.cook(self)
|
||||
|
||||
# provide stuff
|
||||
for outgoing in dag.edges[next_job]:
|
||||
if not isinstance(outgoing, Resource):
|
||||
continue
|
||||
tracker.provide(outgoing)
|
||||
except Exception as e:
|
||||
meta.state = RecipeState.FAILED
|
||||
raise RuntimeError(f"Recipe {next_job} failed!") from e
|
||||
eprint(f"cooked {next_job}")
|
||||
await self._store_dependency(next_job)
|
||||
meta.state = RecipeState.COOKED
|
||||
elif isinstance(next_job, Resource):
|
||||
eprint(f"have {next_job}")
|
||||
pass
|
||||
@ -328,7 +381,7 @@ class Kitchen:
|
||||
dependency_tracker = self._dependency_trackers.pop(recipe, None)
|
||||
if not dependency_tracker:
|
||||
raise KeyError(f"Recipe {recipe} has not been tracked.")
|
||||
depbook = dependency_tracker.book
|
||||
depbook = dependency_tracker.build_book()
|
||||
if depbook:
|
||||
await self._dependency_store.register(recipe, depbook)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user