Implement listen/watch edges
This commit is contained in:
parent
194250d072
commit
9da8b4234e
@ -137,6 +137,9 @@ class RecipeDag:
|
|||||||
|
|
||||||
self.resource_time: Dict[Resource, int] = dict()
|
self.resource_time: Dict[Resource, int] = dict()
|
||||||
|
|
||||||
|
# true for 'only when changed', false for '@when changed'
|
||||||
|
self.watching: Dict[Recipe, Dict[Vertex, bool]] = defaultdict(dict)
|
||||||
|
|
||||||
def add(self, vertex: Vertex):
|
def add(self, vertex: Vertex):
|
||||||
self.vertices.add(vertex)
|
self.vertices.add(vertex)
|
||||||
if isinstance(vertex, Recipe):
|
if isinstance(vertex, Recipe):
|
||||||
@ -211,3 +214,10 @@ class RecipeDag:
|
|||||||
after_meta.incoming_uncompleted += 1
|
after_meta.incoming_uncompleted += 1
|
||||||
# TODO if after_meta.state ==
|
# TODO if after_meta.state ==
|
||||||
# TODO else ...
|
# TODO else ...
|
||||||
|
|
||||||
|
def watches(self, recipe: "Recipe", watching: Vertex, only_when: bool) -> None:
|
||||||
|
if watching not in self.edges or recipe not in self.edges[watching]:
|
||||||
|
raise ValueError(
|
||||||
|
f"{recipe} needs to be after {watching} before it can watch it."
|
||||||
|
)
|
||||||
|
self.watching[recipe][watching] = only_when
|
||||||
|
@ -80,8 +80,8 @@ ListenEdgeDirectiveKind:
|
|||||||
|
|
||||||
ListenEdgeDirective[ws=' \t']:
|
ListenEdgeDirective[ws=' \t']:
|
||||||
kind=ListenEdgeDirectiveKind
|
kind=ListenEdgeDirectiveKind
|
||||||
(recipe_id=IdString | resource=Resource)
|
((':' recipe_id=IdString) | resource=Resource)
|
||||||
'changes'
|
'changes' /\n/+
|
||||||
;
|
;
|
||||||
|
|
||||||
|
|
||||||
|
@ -208,9 +208,32 @@ class Kitchen:
|
|||||||
) -> Tuple[Optional[DependencyBook], bool]:
|
) -> Tuple[Optional[DependencyBook], bool]:
|
||||||
"""
|
"""
|
||||||
:param recipe: recipe to inquire about
|
:param recipe: recipe to inquire about
|
||||||
:return: dep book, or None if there wasn't one
|
:return: dep book, or None if there wasn't one needed to compute this
|
||||||
and true if the recipe should be skipped, false otherwise.
|
and true if the recipe should be skipped, false otherwise.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
only_when_flag_set = False
|
||||||
|
|
||||||
|
if recipe in self.head.dag.watching:
|
||||||
|
for watching, only_when in self.head.dag.watching[recipe].items():
|
||||||
|
if isinstance(watching, Resource):
|
||||||
|
# only recipe watches are accepted here.
|
||||||
|
# resource watches are handled by adding them to the watchlist
|
||||||
|
# in the dependency book
|
||||||
|
continue
|
||||||
|
assert isinstance(watching, Recipe)
|
||||||
|
only_when_flag_set |= only_when
|
||||||
|
watch_rmeta = self.head.dag.recipe_meta[watching]
|
||||||
|
if watch_rmeta.state == RecipeState.COOKED:
|
||||||
|
# underlying recipe changed. Ideally want a new changed state.
|
||||||
|
# TODO(design): have a 'changed' state for recipes?
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
if only_when_flag_set:
|
||||||
|
# TODO(design) is it sensible to skip this here? What if we need to
|
||||||
|
# provide something? I suppose it's not guaranteed to be provided.
|
||||||
|
return None, True
|
||||||
|
|
||||||
inquiry = await self._dependency_store.inquire(recipe)
|
inquiry = await self._dependency_store.inquire(recipe)
|
||||||
if inquiry is None:
|
if inquiry is None:
|
||||||
return None, False
|
return None, False
|
||||||
@ -288,6 +311,16 @@ class Kitchen:
|
|||||||
meta.state = RecipeState.FAILED
|
meta.state = RecipeState.FAILED
|
||||||
raise RuntimeError(f"Recipe {next_job} failed!") from e
|
raise RuntimeError(f"Recipe {next_job} failed!") from e
|
||||||
eprint(f"cooked {next_job}")
|
eprint(f"cooked {next_job}")
|
||||||
|
|
||||||
|
if next_job in self.head.dag.watching:
|
||||||
|
for watching, only_when in self.head.dag.watching[
|
||||||
|
next_job
|
||||||
|
].items():
|
||||||
|
if isinstance(watching, Resource):
|
||||||
|
# recipe watches are handled when loading the
|
||||||
|
# dependency book.
|
||||||
|
tracker.watch(watching)
|
||||||
|
|
||||||
await self._store_dependency(next_job)
|
await self._store_dependency(next_job)
|
||||||
meta.state = RecipeState.COOKED
|
meta.state = RecipeState.COOKED
|
||||||
elif isinstance(next_job, Resource):
|
elif isinstance(next_job, Resource):
|
||||||
|
@ -74,6 +74,14 @@ class ResourceEdgeDirective:
|
|||||||
resource: Resource
|
resource: Resource
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s(auto_attribs=True)
|
||||||
|
class ListenEdgeDirective:
|
||||||
|
# "when" or "only when"
|
||||||
|
kind: str
|
||||||
|
|
||||||
|
recipe_or_resource: Union[str, Resource]
|
||||||
|
|
||||||
|
|
||||||
@attr.s(auto_attribs=True, eq=False)
|
@attr.s(auto_attribs=True, eq=False)
|
||||||
class MenuBlock:
|
class MenuBlock:
|
||||||
id: Optional[None]
|
id: Optional[None]
|
||||||
@ -109,6 +117,7 @@ class MenuRecipe:
|
|||||||
for_directives: List[ForDirective] = attr.ib(factory=list)
|
for_directives: List[ForDirective] = attr.ib(factory=list)
|
||||||
recipe_edges: List[RecipeEdgeDirective] = attr.ib(factory=list)
|
recipe_edges: List[RecipeEdgeDirective] = attr.ib(factory=list)
|
||||||
resource_edges: List[ResourceEdgeDirective] = attr.ib(factory=list)
|
resource_edges: List[ResourceEdgeDirective] = attr.ib(factory=list)
|
||||||
|
listen_edges: List[ListenEdgeDirective] = attr.ib(factory=list)
|
||||||
|
|
||||||
|
|
||||||
def convert_textx_value(txvalue) -> Any:
|
def convert_textx_value(txvalue) -> Any:
|
||||||
@ -165,6 +174,14 @@ def convert_textx_recipe(txrecipe_or_subblock, parent: Optional[MenuBlock]):
|
|||||||
recipe.recipe_edges.append(
|
recipe.recipe_edges.append(
|
||||||
RecipeEdgeDirective(directive.kind[1:], directive.id)
|
RecipeEdgeDirective(directive.kind[1:], directive.id)
|
||||||
)
|
)
|
||||||
|
elif isinstance(directive, scoml_classes["ListenEdgeDirective"]):
|
||||||
|
recipe.listen_edges.append(
|
||||||
|
ListenEdgeDirective(
|
||||||
|
directive.kind[1:],
|
||||||
|
directive.recipe_id
|
||||||
|
or convert_textx_resource(directive.resource),
|
||||||
|
)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown directive {directive}")
|
raise ValueError(f"Unknown directive {directive}")
|
||||||
|
|
||||||
@ -185,7 +202,9 @@ def convert_textx_resource(txresource) -> Resource:
|
|||||||
else:
|
else:
|
||||||
sous = txresource.sous
|
sous = txresource.sous
|
||||||
|
|
||||||
return Resource(txresource.type, txresource.primary, sous, extra_params)
|
return Resource(
|
||||||
|
txresource.type, convert_textx_value(txresource.primary), sous, extra_params
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def convert_textx_block(txblock, parent: Optional[MenuBlock]) -> MenuBlock:
|
def convert_textx_block(txblock, parent: Optional[MenuBlock]) -> MenuBlock:
|
||||||
@ -450,7 +469,7 @@ class MenuLoader:
|
|||||||
fors: Tuple[ForDirective, ...],
|
fors: Tuple[ForDirective, ...],
|
||||||
applicable_souss: Iterable[str],
|
applicable_souss: Iterable[str],
|
||||||
sous_mask: Optional[Set[str]],
|
sous_mask: Optional[Set[str]],
|
||||||
):
|
) -> None:
|
||||||
# TODO(feature): add edges
|
# TODO(feature): add edges
|
||||||
|
|
||||||
# add fors
|
# add fors
|
||||||
@ -497,7 +516,36 @@ class MenuLoader:
|
|||||||
elif resource_edge.kind == "provides":
|
elif resource_edge.kind == "provides":
|
||||||
self._dag.provides(instance, resource)
|
self._dag.provides(instance, resource)
|
||||||
|
|
||||||
# XXX apply specific edges here including those from parent
|
for listen_edge in recipe.listen_edges:
|
||||||
|
if isinstance(listen_edge.recipe_or_resource, Resource):
|
||||||
|
# TODO(design): is it right for this to NEED it rather
|
||||||
|
# than WANT it?
|
||||||
|
resource = listen_edge.recipe_or_resource
|
||||||
|
if resource.sous == "(self)":
|
||||||
|
resource = attr.evolve(resource, sous=sous)
|
||||||
|
self._dag.needs(instance, resource)
|
||||||
|
self._dag.watches(
|
||||||
|
instance, resource, listen_edge.kind == "only when",
|
||||||
|
)
|
||||||
|
elif isinstance(listen_edge.recipe_or_resource, str):
|
||||||
|
target = self.resolve_ref(
|
||||||
|
recipe, listen_edge.recipe_or_resource
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(target, MenuRecipe):
|
||||||
|
for target_instance in self.get_related_instances(
|
||||||
|
sous, for_indices, recipe, target
|
||||||
|
):
|
||||||
|
self._dag.add_ordering(target_instance, instance)
|
||||||
|
self._dag.watches(
|
||||||
|
instance,
|
||||||
|
target_instance,
|
||||||
|
listen_edge.kind == "only when",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f"not supported on target: {target!r}")
|
||||||
|
|
||||||
|
# XXX apply edges from parent
|
||||||
|
|
||||||
def postdagify_block(
|
def postdagify_block(
|
||||||
self,
|
self,
|
||||||
|
Loading…
Reference in New Issue
Block a user