STT-tensorflow/tensorflow/python/keras/distribute/sidecar_evaluator.py
A. Unique TensorFlower 1fbddee1b1 add support to load checkpoint that saved by ModelCheckpoint callback for sidecar_evaluator.
PiperOrigin-RevId: 348071878
Change-Id: Id9ae47394adecb497d60a2b902da288265a4b400
2020-12-17 12:31:09 -08:00

235 lines
9.8 KiB
Python

# Lint as: python3
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Python module for evaluation loop."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import re
# pylint: disable=g-direct-tensorflow-import
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import errors_impl
from tensorflow.python.ops import summary_ops_v2
from tensorflow.python.ops import variables
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import checkpoint_utils
from tensorflow.python.training.tracking import util as tracking_util
_PRINT_EVAL_STEP_EVERY_SEC = 60.0
_ITERATIONS_UNINITIALIZED = -1
def list_checkpoint_attributes(ckpt_dir_or_file):
"""Lists all the attributes in a checkpoint.
Checkpoint keys are paths in a checkpoint graph, and attribute is the first
element in the path. e.g. with a checkpoint key
"optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE", optimizer is the attribute. The
attribute is also used to save/restore a variable in a checkpoint,
e.g. tf.train.Checkpoint(optimizer=optimizer, model=model).
Args:
ckpt_dir_or_file: Directory with checkpoints file or path to checkpoint.
Returns:
Set of attributes in a checkpoint.
"""
reader = checkpoint_utils.load_checkpoint(ckpt_dir_or_file)
variable_map = reader.get_variable_to_shape_map()
return {name.split('/')[0] for name in variable_map.keys()}
class SidecarEvaluator(object):
"""A class designed for a dedicated evaluator task.
`SidecarEvaluator` is expected to be run on a process in
a separate machine from the training cluster. It continuously loads
checkpoints saved periodically by that training counterpart, and performs
evaluation using the model (with compiled metrics) provided at `__init__`. It
is expected to be used for the purpose of a dedicated evaluator, evaluating
the metric results of a training cluster which has one or more workers
performing the training and saving checkpoints. `SidecarEvaluator` uses
`model.evaluate` for evaluation.
Example:
```python
model = tf.keras.models.Sequential(...)
model.compile(metrics=tf.keras.metrics.SparseCategoricalAccuracy(
name="eval_metrics"))
data = tf.data.Dataset.from_tensor_slices(...)
SidecarEvaluator(
model=model,
data=data,
checkpoint_dir='/tmp/checkpoint_dir', # dir for training-saved checkpoint
steps=None, # Eval until dataset is exhausted
log_dir='/tmp/log_dir',
max_evaluations=None # The evaluation needs to be stopped manually
).start()
```
`SidecarEvaluator.start` writes a series of summary
files which can be visualized by tensorboard (which provides a webpage link):
```bash
$ tensorboard --logdir=/tmp/log_dir
...
TensorBoard 2.4.0a0 at http://host:port (Press CTRL+C to quit)
```
The `checkpoint_dir` should contain checkpoints that track `model` and
`optimizer` to fulfill `SidecarEvaluator`'s
expectation:
```python
checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, checkpoint_dir=..., max_to_keep=...)
checkpoint_manager.save()
```
"""
def __init__(self,
model,
data,
checkpoint_dir,
log_dir=None,
steps=None,
max_evaluations=None):
"""Initializes an `SidecarEvaluator` object.
Args:
model: Model to use for evaluation. The model object used here should be a
`tf.keras.Model`, and should be the same as the one that is used in
training, where `tf.keras.Model`s are checkpointed. The model should
have one or more metrics compiled before using `SidecarEvaluator`.
data: The input data for evaluation. `SidecarEvaluator` supports all data
types that Keras `model.evaluate` supports as the input data `x`, such
as a `tf.data.Dataset`.
checkpoint_dir: Directory where checkpoint files are saved.
log_dir: Directory where summary files for TensorBoard are saved.
steps: Number of steps to perform evaluation for, when evaluating a single
checkpoint file. If `None`, evaluation continues until the dataset is
exhausted. For repeated evaluation dataset, user must specify `steps` to
avoid infinite evaluation loop.
max_evaluations: Maximum number of the checkpoint file to be evaluated,
for `SidecarEvaluator` to know when to stop. The evaluator will stop
after it evaluates a checkpoint filepath ending with
'<ckpt_name>-<max_evaluations>'. If using
`tf.train.CheckpointManager.save` for saving checkpoints, the kth saved
checkpoint has the filepath suffix '<ckpt_name>-<k>' (k=1 for the first
saved), and if checkpoints are saved every epoch after training, the
filepath saved at the kth epoch would end with '<ckpt_name>-<k>. Thus,
if training runs for n epochs, and the evaluator should end after the
training finishes, use n for this parameter. Note that this is not
necessarily equal to the number of total evaluations, since some
checkpoints may be skipped if evaluation is slower than checkpoint
creation. If `None`, `SidecarEvaluator` will evaluate indefinitely, and
the user must terminate evaluator program themselves.
"""
self.model = model
self.data = data
self.checkpoint_dir = checkpoint_dir
if log_dir:
self._summary_writer = summary_ops_v2.create_file_writer_v2(
logdir=log_dir)
else:
self._summary_writer = None
self._iterations = variables.Variable(
name='iterations',
initial_value=_ITERATIONS_UNINITIALIZED,
dtype=dtypes.int64)
self.max_evaluations = max_evaluations
self.steps = steps
def start(self):
"""Starts the evaluation loop."""
optimizer_checkpoint = tracking_util.Checkpoint(iter=self._iterations)
checkpoint = tracking_util.Checkpoint(
model=self.model, optimizer=optimizer_checkpoint)
for latest_checkpoint in checkpoint_utils.checkpoints_iterator(
self.checkpoint_dir):
try:
# `expect_partial` because the checkpoint can have other `Trackable`s
# such as `optimizer`.
checkpoint.restore(latest_checkpoint).expect_partial()
checkpoint_attributes = list_checkpoint_attributes(latest_checkpoint)
# The checkpoint should contain model and optimizer for SidecarEvaluator
# to work. But the model weights saved by ModelCheckpoint callback does
# not contain model as an attribute. To make SidecarEvaluator compatibly
# work in this case, if model attribute is not found but
# layer_with_weights attribute is found, use model.load_weights to load
# the model's weights, while self._iterations is still restored by
# checkpoint variable.
if 'model' not in checkpoint_attributes:
for attribute in checkpoint_attributes:
# check whether the checkpoint has the required attributes for
# model.load_weights to work.
if re.match(r'^layer_with_weights-[\d+]', attribute) is not None:
self.model.load_weights(latest_checkpoint)
break
except (errors_impl.OpError,) as e:
# A couple errors can happen here with the coordinator racing to write
# checkpoint:
# 1) OpError: open failed for <file path>: No such file or directory
# 2) NotFoundError (subclass of OpError): Unsuccessful
# TensorSliceReader constructor.
# TODO(rchao): Remove this except block once b/150954027 is resolved.
logging.info(
'SidecarEvaluator has an error loading '
'checkpoint: %s. Retrying. Error: %s: %s', latest_checkpoint,
e.__class__.__name__, e)
continue
if self._iterations.numpy() == _ITERATIONS_UNINITIALIZED:
raise RuntimeError(
'`iterations` cannot be loaded from the '
'checkpoint file. Please ensure `iterations` is '
'tracked in the `checkpoint` saved by the coordinator.')
logging.info(
'Evaluation starts: Model weights loaded from latest '
'checkpoint file: %s.', latest_checkpoint)
# TODO(rchao): Support arbitrary callback for extensibility.
self.model.evaluate(self.data, steps=self.steps)
logging.info('End of evaluation. Accuracy: %r', [
metric.result().numpy()
for metric in self.model.compiled_metrics.metrics
])
if self._summary_writer:
with summary_ops_v2.always_record_summaries(
), self._summary_writer.as_default():
for metric in self.model.compiled_metrics.metrics:
summary_ops_v2.scalar(
metric.name,
metric.result(),
step=self._iterations.read_value())
# TODO(rchao): Make the max evaluation robust in case users save the
# checkpoints with epoch format {epoch:03d}.
if (self.max_evaluations and
latest_checkpoint.endswith('-{}'.format(self.max_evaluations))):
# Exit the loop because we have evaluated the final checkpoint file.
logging.info('Last checkpoint evaluated. SidecarEvaluator stops.')
return