STT-tensorflow/tensorflow/tools/dockerfiles/assembler.py

720 lines
24 KiB
Python

# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Multipurpose TensorFlow Docker Helper.
- Assembles Dockerfiles
- Builds images (and optionally runs image tests)
- Pushes images to Docker Hub (provided with credentials)
Logs are written to stderr; the list of successfully built images is
written to stdout.
Read README.md (in this directory) for instructions!
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import copy
import errno
import itertools
import json
import multiprocessing
import os
import platform
import re
import shutil
import sys
from absl import app
from absl import flags
import cerberus
import docker
import yaml
FLAGS = flags.FLAGS
flags.DEFINE_string('hub_username', None,
'Dockerhub username, only used with --upload_to_hub')
flags.DEFINE_string(
'hub_password', None,
('Dockerhub password, only used with --upload_to_hub. Use from an env param'
' so your password isn\'t in your history.'))
flags.DEFINE_integer('hub_timeout', 3600,
'Abort Hub upload if it takes longer than this.')
flags.DEFINE_string(
'repository', 'tensorflow',
'Tag local images as {repository}:tag (in addition to the '
'hub_repository, if uploading to hub)')
flags.DEFINE_string(
'hub_repository', None,
'Push tags to this Docker Hub repository, e.g. tensorflow/tensorflow')
flags.DEFINE_boolean(
'upload_to_hub',
False,
('Push built images to Docker Hub (you must also provide --hub_username, '
'--hub_password, and --hub_repository)'),
short_name='u',
)
flags.DEFINE_boolean(
'construct_dockerfiles', False, 'Do not build images', short_name='d')
flags.DEFINE_boolean(
'keep_temp_dockerfiles',
False,
'Retain .temp.Dockerfiles created while building images.',
short_name='k')
flags.DEFINE_boolean(
'build_images', False, 'Do not build images', short_name='b')
flags.DEFINE_string(
'run_tests_path', None,
('Execute test scripts on generated Dockerfiles before pushing them. '
'Flag value must be a full path to the "tests" directory, which is usually'
' $(realpath ./tests). A failed tests counts the same as a failed build.'))
flags.DEFINE_boolean(
'stop_on_failure', False,
('Stop processing tags if any one build fails. If False or not specified, '
'failures are reported but do not affect the other images.'))
flags.DEFINE_boolean(
'dry_run',
False,
'Do not build or deploy anything at all.',
short_name='n',
)
flags.DEFINE_string(
'exclude_tags_matching',
None,
('Regular expression that skips processing on any tag it matches. Must '
'match entire string, e.g. ".*gpu.*" ignores all GPU tags.'),
short_name='x')
flags.DEFINE_string(
'only_tags_matching',
None,
('Regular expression that skips processing on any tag it does not match. '
'Must match entire string, e.g. ".*gpu.*" includes only GPU tags.'),
short_name='i')
flags.DEFINE_string(
'dockerfile_dir',
'./dockerfiles', 'Path to an output directory for Dockerfiles.'
' Will be created if it doesn\'t exist.'
' Existing files in this directory will be deleted when new Dockerfiles'
' are made.',
short_name='o')
flags.DEFINE_string(
'partial_dir',
'./partials',
'Path to a directory containing foo.partial.Dockerfile partial files.'
' can have subdirectories, e.g. "bar/baz.partial.Dockerfile".',
short_name='p')
flags.DEFINE_multi_string(
'release', [],
'Set of releases to build and tag. Defaults to every release type.',
short_name='r')
flags.DEFINE_multi_string(
'arg', [],
('Extra build arguments. These are used for expanding tag names if needed '
'(e.g. --arg _TAG_PREFIX=foo) and for using as build arguments (unused '
'args will print a warning).'),
short_name='a')
flags.DEFINE_boolean(
'nocache', False,
'Disable the Docker build cache; identical to "docker build --no-cache"')
flags.DEFINE_string(
'spec_file',
'./spec.yml',
'Path to the YAML specification file',
short_name='s')
# Schema to verify the contents of tag-spec.yml with Cerberus.
# Must be converted to a dict from yaml to work.
# Note: can add python references with e.g.
# !!python/name:builtins.str
# !!python/name:__main__.funcname
# (but this may not be considered safe?)
SCHEMA_TEXT = """
header:
type: string
slice_sets:
type: dict
keyschema:
type: string
valueschema:
type: list
schema:
type: dict
schema:
add_to_name:
type: string
dockerfile_exclusive_name:
type: string
dockerfile_subdirectory:
type: string
partials:
type: list
schema:
type: string
ispartial: true
test_runtime:
type: string
required: false
tests:
type: list
default: []
schema:
type: string
args:
type: list
default: []
schema:
type: string
isfullarg: true
releases:
type: dict
keyschema:
type: string
valueschema:
type: dict
schema:
is_dockerfiles:
type: boolean
required: false
default: false
upload_images:
type: boolean
required: false
default: true
tag_specs:
type: list
required: true
schema:
type: string
"""
class TfDockerTagValidator(cerberus.Validator):
"""Custom Cerberus validator for TF tag spec.
Note: Each _validate_foo function's docstring must end with a segment
describing its own validation schema, e.g. "The rule's arguments are...". If
you add a new validator, you can copy/paste that section.
"""
def __init__(self, *args, **kwargs):
# See http://docs.python-cerberus.org/en/stable/customize.html
if 'partials' in kwargs:
self.partials = kwargs['partials']
super(cerberus.Validator, self).__init__(*args, **kwargs)
def _validate_ispartial(self, ispartial, field, value):
"""Validate that a partial references an existing partial spec.
Args:
ispartial: Value of the rule, a bool
field: The field being validated
value: The field's value
The rule's arguments are validated against this schema:
{'type': 'boolean'}
"""
if ispartial and value not in self.partials:
self._error(field,
'{} is not present in the partials directory.'.format(value))
def _validate_isfullarg(self, isfullarg, field, value):
"""Validate that a string is either a FULL=arg or NOT.
Args:
isfullarg: Value of the rule, a bool
field: The field being validated
value: The field's value
The rule's arguments are validated against this schema:
{'type': 'boolean'}
"""
if isfullarg and '=' not in value:
self._error(field, '{} should be of the form ARG=VALUE.'.format(value))
if not isfullarg and '=' in value:
self._error(field, '{} should be of the form ARG (no =).'.format(value))
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, flush=True, **kwargs)
def aggregate_all_slice_combinations(spec, slice_set_names):
"""Figure out all of the possible slice groupings for a tag spec."""
slice_sets = copy.deepcopy(spec['slice_sets'])
for name in slice_set_names:
for slice_set in slice_sets[name]:
slice_set['set_name'] = name
slices_grouped_but_not_keyed = [slice_sets[name] for name in slice_set_names]
all_slice_combos = list(itertools.product(*slices_grouped_but_not_keyed))
return all_slice_combos
def build_name_from_slices(format_string, slices, args, is_dockerfile=False):
"""Build the tag name (cpu-devel...) from a list of slices."""
name_formatter = copy.deepcopy(args)
name_formatter.update({s['set_name']: s['add_to_name'] for s in slices})
name_formatter.update({
s['set_name']: s['dockerfile_exclusive_name']
for s in slices
if is_dockerfile and 'dockerfile_exclusive_name' in s
})
name = format_string.format(**name_formatter)
return name
def update_args_dict(args_dict, updater):
"""Update a dict of arg values with more values from a list or dict."""
if isinstance(updater, list):
for arg in updater:
key, sep, value = arg.partition('=')
if sep == '=':
args_dict[key] = value
if isinstance(updater, dict):
for key, value in updater.items():
args_dict[key] = value
return args_dict
def get_slice_sets_and_required_args(slice_sets, tag_spec):
"""Extract used-slice-sets and required CLI arguments from a spec string.
For example, {FOO}{bar}{bat} finds FOO, bar, and bat. Assuming bar and bat
are both named slice sets, FOO must be specified on the command line.
Args:
slice_sets: Dict of named slice sets
tag_spec: The tag spec string, e.g. {_FOO}{blep}
Returns:
(used_slice_sets, required_args), a tuple of lists
"""
required_args = []
used_slice_sets = []
extract_bracketed_words = re.compile(r'\{([^}]+)\}')
possible_args_or_slice_set_names = extract_bracketed_words.findall(tag_spec)
for name in possible_args_or_slice_set_names:
if name in slice_sets:
used_slice_sets.append(name)
else:
required_args.append(name)
return (used_slice_sets, required_args)
def gather_tag_args(slices, cli_input_args, required_args):
"""Build a dictionary of all the CLI and slice-specified args for a tag."""
args = {}
for s in slices:
args = update_args_dict(args, s['args'])
args = update_args_dict(args, cli_input_args)
for arg in required_args:
if arg not in args:
eprint(('> Error: {} is not a valid slice_set, and also isn\'t an arg '
'provided on the command line. If it is an arg, please specify '
'it with --arg. If not, check the slice_sets list.'.format(arg)))
exit(1)
return args
def gather_slice_list_items(slices, key):
"""For a list of slices, get the flattened list of all of a certain key."""
return list(itertools.chain(*[s[key] for s in slices if key in s]))
def find_first_slice_value(slices, key):
"""For a list of slices, get the first value for a certain key."""
for s in slices:
if key in s and s[key] is not None:
return s[key]
return None
def assemble_tags(spec, cli_args, enabled_releases, all_partials):
"""Gather all the tags based on our spec.
Args:
spec: Nested dict containing full Tag spec
cli_args: List of ARG=foo arguments to pass along to Docker build
enabled_releases: List of releases to parse. Empty list = all
all_partials: Dict of every partial, for reference
Returns:
Dict of tags and how to build them
"""
tag_data = collections.defaultdict(list)
for name, release in spec['releases'].items():
for tag_spec in release['tag_specs']:
if enabled_releases and name not in enabled_releases:
eprint('> Skipping release {}'.format(name))
continue
used_slice_sets, required_cli_args = get_slice_sets_and_required_args(
spec['slice_sets'], tag_spec)
slice_combos = aggregate_all_slice_combinations(spec, used_slice_sets)
for slices in slice_combos:
tag_args = gather_tag_args(slices, cli_args, required_cli_args)
tag_name = build_name_from_slices(tag_spec, slices, tag_args,
release['is_dockerfiles'])
used_partials = gather_slice_list_items(slices, 'partials')
used_tests = gather_slice_list_items(slices, 'tests')
test_runtime = find_first_slice_value(slices, 'test_runtime')
dockerfile_subdirectory = find_first_slice_value(
slices, 'dockerfile_subdirectory')
dockerfile_contents = merge_partials(spec['header'], used_partials,
all_partials)
tag_data[tag_name].append({
'release': name,
'tag_spec': tag_spec,
'is_dockerfiles': release['is_dockerfiles'],
'upload_images': release['upload_images'],
'cli_args': tag_args,
'dockerfile_subdirectory': dockerfile_subdirectory or '',
'partials': used_partials,
'tests': used_tests,
'test_runtime': test_runtime,
'dockerfile_contents': dockerfile_contents,
})
return tag_data
def merge_partials(header, used_partials, all_partials):
"""Merge all partial contents with their header."""
used_partials = list(used_partials)
return '\n'.join([header] + [all_partials[u] for u in used_partials])
def upload_in_background(hub_repository, dock, image, tag):
"""Upload a docker image (to be used by multiprocessing)."""
image.tag(hub_repository, tag=tag)
print(dock.images.push(hub_repository, tag=tag))
def mkdir_p(path):
"""Create a directory and its parents, even if it already exists."""
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def gather_existing_partials(partial_path):
"""Find and read all available partials.
Args:
partial_path (string): read partials from this directory.
Returns:
Dict[string, string] of partial short names (like "ubuntu/python" or
"bazel") to the full contents of that partial.
"""
partials = {}
for path, _, files in os.walk(partial_path):
for name in files:
fullpath = os.path.join(path, name)
if '.partial.Dockerfile' not in fullpath:
eprint(('> Probably not a problem: skipping {}, which is not a '
'partial.').format(fullpath))
continue
# partial_dir/foo/bar.partial.Dockerfile -> foo/bar
simple_name = fullpath[len(partial_path) + 1:-len('.partial.dockerfile')]
with open(fullpath, 'r') as f:
partial_contents = f.read()
partials[simple_name] = partial_contents
return partials
def main(argv):
if len(argv) > 1:
raise app.UsageError('Too many command-line arguments.')
# Read the full spec file, used for everything
with open(FLAGS.spec_file, 'r') as spec_file:
tag_spec = yaml.safe_load(spec_file)
# Get existing partial contents
partials = gather_existing_partials(FLAGS.partial_dir)
# Abort if spec.yaml is invalid
schema = yaml.safe_load(SCHEMA_TEXT)
v = TfDockerTagValidator(schema, partials=partials)
if not v.validate(tag_spec):
eprint('> Error: {} is an invalid spec! The errors are:'.format(
FLAGS.spec_file))
eprint(yaml.dump(v.errors, indent=2))
exit(1)
tag_spec = v.normalized(tag_spec)
# Assemble tags and images used to build them
all_tags = assemble_tags(tag_spec, FLAGS.arg, FLAGS.release, partials)
# Empty Dockerfile directory if building new Dockerfiles
if FLAGS.construct_dockerfiles:
eprint('> Emptying Dockerfile dir "{}"'.format(FLAGS.dockerfile_dir))
shutil.rmtree(FLAGS.dockerfile_dir, ignore_errors=True)
mkdir_p(FLAGS.dockerfile_dir)
# Set up Docker helper
dock = docker.from_env()
# Login to Docker if uploading images
if FLAGS.upload_to_hub:
if not FLAGS.hub_username:
eprint('> Error: please set --hub_username when uploading to Dockerhub.')
exit(1)
if not FLAGS.hub_repository:
eprint(
'> Error: please set --hub_repository when uploading to Dockerhub.')
exit(1)
if not FLAGS.hub_password:
eprint('> Error: please set --hub_password when uploading to Dockerhub.')
exit(1)
dock.login(
username=FLAGS.hub_username,
password=FLAGS.hub_password,
)
# Each tag has a name ('tag') and a definition consisting of the contents
# of its Dockerfile, its build arg list, etc.
failed_tags = []
succeeded_tags = []
for tag, tag_defs in all_tags.items():
for tag_def in tag_defs:
eprint('> Working on {}'.format(tag))
if FLAGS.exclude_tags_matching and re.match(FLAGS.exclude_tags_matching,
tag):
eprint('>> Excluded due to match against "{}".'.format(
FLAGS.exclude_tags_matching))
continue
if FLAGS.only_tags_matching and not re.match(FLAGS.only_tags_matching,
tag):
eprint('>> Excluded due to failure to match against "{}".'.format(
FLAGS.only_tags_matching))
continue
# Write releases marked "is_dockerfiles" into the Dockerfile directory
if FLAGS.construct_dockerfiles and tag_def['is_dockerfiles']:
path = os.path.join(FLAGS.dockerfile_dir,
tag_def['dockerfile_subdirectory'],
tag + '.Dockerfile')
eprint('>> Writing {}...'.format(path))
if not FLAGS.dry_run:
mkdir_p(os.path.dirname(path))
with open(path, 'w') as f:
f.write(tag_def['dockerfile_contents'])
# Don't build any images for dockerfile-only releases
if not FLAGS.build_images:
continue
# Only build images for host architecture
proc_arch = platform.processor()
is_x86 = proc_arch.startswith('x86')
if (is_x86 and any(arch in tag for arch in ['ppc64le']) or
not is_x86 and proc_arch not in tag):
continue
# Generate a temporary Dockerfile to use to build, since docker-py
# needs a filepath relative to the build context (i.e. the current
# directory)
dockerfile = os.path.join(FLAGS.dockerfile_dir, tag + '.temp.Dockerfile')
if not FLAGS.dry_run:
with open(dockerfile, 'w') as f:
f.write(tag_def['dockerfile_contents'])
eprint('>> (Temporary) writing {}...'.format(dockerfile))
repo_tag = '{}:{}'.format(FLAGS.repository, tag)
eprint('>> Building {} using build args:'.format(repo_tag))
for arg, value in tag_def['cli_args'].items():
eprint('>>> {}={}'.format(arg, value))
# Note that we are NOT using cache_from, which appears to limit
# available cache layers to those from explicitly specified layers. Many
# of our layers are similar between local builds, so we want to use the
# implied local build cache.
tag_failed = False
image, logs = None, []
if not FLAGS.dry_run:
try:
# Use low level APIClient in order to stream log output
resp = dock.api.build(
timeout=FLAGS.hub_timeout,
path='.',
nocache=FLAGS.nocache,
dockerfile=dockerfile,
buildargs=tag_def['cli_args'],
tag=repo_tag)
last_event = None
image_id = None
# Manually process log output extracting build success and image id
# in order to get built image
while True:
try:
output = next(resp).decode('utf-8')
json_output = json.loads(output.strip('\r\n'))
if 'stream' in json_output:
eprint(json_output['stream'], end='')
match = re.search(r'(^Successfully built |sha256:)([0-9a-f]+)$',
json_output['stream'])
if match:
image_id = match.group(2)
last_event = json_output['stream']
# collect all log lines into the logs object
logs.append(json_output)
except StopIteration:
eprint('Docker image build complete.')
break
except ValueError:
eprint('Error parsing from docker image build: {}'.format(output))
# If Image ID is not set, the image failed to built properly. Raise
# an error in this case with the last log line and all logs
if image_id:
image = dock.images.get(image_id)
else:
raise docker.errors.BuildError(last_event or 'Unknown', logs)
# Run tests if requested, and dump output
# Could be improved by backgrounding, but would need better
# multiprocessing support to track failures properly.
if FLAGS.run_tests_path:
if not tag_def['tests']:
eprint('>>> No tests to run.')
for test in tag_def['tests']:
eprint('>> Testing {}...'.format(test))
container, = dock.containers.run(
image,
'/tests/' + test,
working_dir='/',
log_config={'type': 'journald'},
detach=True,
stderr=True,
stdout=True,
volumes={
FLAGS.run_tests_path: {
'bind': '/tests',
'mode': 'ro'
}
},
runtime=tag_def['test_runtime']),
ret = container.wait()
code = ret['StatusCode']
out = container.logs(stdout=True, stderr=False)
err = container.logs(stdout=False, stderr=True)
container.remove()
if out:
eprint('>>> Output stdout:')
eprint(out.decode('utf-8'))
else:
eprint('>>> No test standard out.')
if err:
eprint('>>> Output stderr:')
eprint(err.decode('utf-8'))
else:
eprint('>>> No test standard err.')
if code != 0:
eprint('>> {} failed tests with status: "{}"'.format(
repo_tag, code))
failed_tags.append(tag)
tag_failed = True
if FLAGS.stop_on_failure:
eprint('>> ABORTING due to --stop_on_failure!')
exit(1)
else:
eprint('>> Tests look good!')
except docker.errors.BuildError as e:
eprint('>> {} failed to build with message: "{}"'.format(
repo_tag, e.msg))
eprint('>> Build logs follow:')
log_lines = [l.get('stream', '') for l in e.build_log]
eprint(''.join(log_lines))
failed_tags.append(tag)
tag_failed = True
if FLAGS.stop_on_failure:
eprint('>> ABORTING due to --stop_on_failure!')
exit(1)
# Clean temporary dockerfiles if they were created earlier
if not FLAGS.keep_temp_dockerfiles:
os.remove(dockerfile)
# Upload new images to DockerHub as long as they built + passed tests
if FLAGS.upload_to_hub:
if not tag_def['upload_images']:
continue
if tag_failed:
continue
eprint('>> Uploading to {}:{}'.format(FLAGS.hub_repository, tag))
if not FLAGS.dry_run:
p = multiprocessing.Process(
target=upload_in_background,
args=(FLAGS.hub_repository, dock, image, tag))
p.start()
if not tag_failed:
succeeded_tags.append(tag)
if failed_tags:
eprint(
'> Some tags failed to build or failed testing, check scrollback for '
'errors: {}'.format(','.join(failed_tags)))
exit(1)
eprint('> Writing built{} tags to standard out.'.format(
' and tested' if FLAGS.run_tests_path else ''))
for tag in succeeded_tags:
print('{}:{}'.format(FLAGS.repository, tag))
if __name__ == '__main__':
app.run(main)