From e8c65fa77fb7473d95988fa23e51c906a428b27a Mon Sep 17 00:00:00 2001 From: Francois Chollet Date: Tue, 11 Dec 2018 16:45:06 -0800 Subject: [PATCH] Small refactor to improve the readability of the Model class for those who use the code as documentation. General idea: most important methods come first, private utilities are moved to the bottom of the class. Also use a single method for `_standardize_user_data` (previously split into 2 methods that did not reflect two separate sets of actions). PiperOrigin-RevId: 225094903 --- tensorflow/python/keras/engine/training.py | 1983 ++++++++--------- .../keras/engine/training_distributed.py | 7 +- 2 files changed, 968 insertions(+), 1022 deletions(-) diff --git a/tensorflow/python/keras/engine/training.py b/tensorflow/python/keras/engine/training.py index fe44bc20a1c..75d64969882 100644 --- a/tensorflow/python/keras/engine/training.py +++ b/tensorflow/python/keras/engine/training.py @@ -128,307 +128,6 @@ class Model(Network): self.run_eagerly = None - def _set_sample_weight_attributes(self, sample_weight_mode, - skip_target_weighing_indices): - """Sets sample weight related attributes on the model.""" - sample_weights, sample_weight_modes = training_utils.prepare_sample_weights( - self.output_names, sample_weight_mode, skip_target_weighing_indices) - self.sample_weights = sample_weights - self.sample_weight_modes = sample_weight_modes - self._feed_sample_weight_modes = [ - sample_weight_modes[i] - for i in range(len(self.outputs)) - if i not in skip_target_weighing_indices - ] - self._feed_sample_weights = [ - sample_weights[i] - for i in range(len(sample_weights)) - if i not in skip_target_weighing_indices - ] - - def _cache_output_metric_attributes(self, metrics, weighted_metrics): - """Caches metric name and function attributes for every model output.""" - output_shapes = [ - None if output is None else output.get_shape().as_list() - for output in self.outputs - ] - self._per_output_metrics = training_utils.collect_per_output_metric_info( - metrics, self.output_names, output_shapes, self.loss_functions) - self._per_output_weighted_metrics = \ - training_utils.collect_per_output_metric_info( - weighted_metrics, self.output_names, output_shapes, - self.loss_functions, self.sample_weights) - - def _add_unique_metric_name(self, metric_name, output_index): - """Makes the metric name unique and adds it to the model's metric name list. - - If there are multiple outputs for which the metrics are calculated, the - metric names have to be made unique by appending an integer. - - Arguments: - metric_name: Metric name that corresponds to the metric specified by the - user. For example: 'acc'. - output_index: The index of the model output for which the metric name is - being added. - - Returns: - string, name of the model's unique metric name - """ - if len(self.output_names) > 1: - metric_name = '%s_%s' % (self.output_names[output_index], metric_name) - j = 1 - base_metric_name = metric_name - while metric_name in self._compile_metrics_names: - metric_name = '%s_%d' % (base_metric_name, j) - j += 1 - - return metric_name - - @property - def metrics(self): - """Returns the model's metrics added using `compile`, `add_metric` APIs.""" - metrics = [] - if self._is_compiled: - metrics += self._compile_stateful_metric_functions - return metrics + super(Model, self).metrics - - @property - def metrics_names(self): - """Returns the model's display labels for all outputs.""" - metrics_names = [] - if self._is_compiled: - metrics_names += self._compile_metrics_names # Includes names of losses. - - # Add metric names from layers. - for layer in self.layers: - metrics_names += [m.name for m in layer._metrics] # pylint: disable=protected-access - metrics_names += [m.name for m in self._metrics] - return metrics_names - - @property - def _all_metrics_tensors(self): - """Returns the network's symbolic metric tensors.""" - metrics_tensors = {} - if self._is_compiled: - metrics_tensors.update(self._compile_metrics_tensors) - metrics_tensors.update(super(Model, self)._all_metrics_tensors) - return metrics_tensors - - @property - def _all_stateful_metrics_tensors(self): - """Returns the network's symbolic metric tensors.""" - metrics_tensors = {} - if self._is_compiled: - metrics_tensors.update(self._compile_stateful_metrics_tensors) - metrics_tensors.update(super(Model, self)._all_metrics_tensors) - return metrics_tensors - - def _init_metric_attributes(self): - """Initialized model metric attributes.""" - # List of all metric names in the model. - self._compile_metrics_names = ['loss'] - # List of stateful metric functions. Used for resetting metric state during - # training/eval. - # This includes loss functions when there are multiple outputs. - self._compile_stateful_metric_functions = [] - # Dict of all aggregated metric result tensors. This includes aggregated - # loss result tensors when there are multiple outputs. - self._compile_stateful_metrics_tensors = {} - # Dict of all metric result tensors (aggregated or not - based on the - # values given in compile.). This includes aggregated loss result tensors - # when there are multiple outputs. - self._compile_metrics_tensors = {} - - def _set_per_output_metric_attributes(self, metrics_dict, output_index): - """Sets the metric attributes on the model for the given output. - - Arguments: - metrics_dict: A dict with metric names as keys and metric fns as values. - output_index: The index of the model output for which the metric - attributes are added. - - Returns: - Metrics dict updated with unique metric names as keys. - """ - updated_metrics_dict = collections.OrderedDict() - for metric_name, (metric_fn, stateful_metric_fn) in metrics_dict.items(): - metric_name = self._add_unique_metric_name(metric_name, output_index) - updated_metrics_dict[metric_name] = (metric_fn, stateful_metric_fn) - # Keep track of metric name, function and stateful function. - self._compile_metrics_names.append(metric_name) - self._compile_stateful_metric_functions.append(stateful_metric_fn) - return updated_metrics_dict - - def _set_metric_attributes(self, outputs, skip_target_indices=None): - """Sets the metric attributes on the model for all the model outputs.""" - skip_target_indices = skip_target_indices or [] - updated_per_output_metrics = [] - updated_per_output_weighted_metrics = [] - for i in range(len(outputs)): - if i in skip_target_indices: - updated_per_output_metrics.append(self._per_output_metrics[i]) - updated_per_output_weighted_metrics.append( - self._per_output_weighted_metrics[i]) - continue - updated_per_output_metrics.append( - self._set_per_output_metric_attributes(self._per_output_metrics[i], - i)) - updated_per_output_weighted_metrics.append( - self._set_per_output_metric_attributes( - self._per_output_weighted_metrics[i], i)) - - self._per_output_metrics = updated_per_output_metrics - self._per_output_weighted_metrics = updated_per_output_weighted_metrics - - def _handle_per_output_metrics(self, - metrics_dict, - y_true, - y_pred, - mask, - weights=None, - return_stateful_result=True): - """Calls metric functions for a single output. - - Arguments: - metrics_dict: A dict with metric names as keys and metric fns as values. - y_true: Target output. - y_pred: Predicted output. - mask: Computed mask value for the current output. - weights: Weights to be applied on the current output. - return_stateful_result: Boolean, indicates whether the stateful - (aggregated)/stateless metric result should be returned. - - Returns: - A list of metric result tensors. - """ - metric_results = [] - for metric_name, (metric_fn, stateful_fn) in metrics_dict.items(): - with K.name_scope(metric_name): - - def _call_stateful_fn(fn): - return training_utils.call_metric_function( - fn, y_true, y_pred, weights=weights, mask=mask) - - def _call_stateless_fn(fn): - weighted_metric_fn = training_utils.weighted_masked_objective(fn) - return weighted_metric_fn(y_true, y_pred, weights=weights, mask=mask) - - def _track_metric_tensors(name, stateless_result, stateful_result): - self._compile_metrics_tensors[name] = stateless_result - self._compile_stateful_metrics_tensors[name] = stateful_result - - if isinstance(metric_fn, metrics_module.Metric): - # If the given metric fn is stateful, call the fn and return result. - metric_result = _call_stateful_fn(metric_fn) - metric_results.append(metric_result) - if not self.run_eagerly: - _track_metric_tensors(metric_name, metric_result, metric_result) - elif self.run_eagerly: - # In eager mode, if the given metric fn is not stateful, we invoke the - # given fn or its stateful version based on the given flag. - if return_stateful_result: - metric_result = _call_stateful_fn(stateful_fn) - else: - metric_result = _call_stateless_fn(metric_fn) - metric_results.append(metric_result) - else: - # In graph mode, we build the sub-graph for both the stateful and the - # stateless fns. - stateful_metric_result = _call_stateful_fn(stateful_fn) - metric_result = _call_stateless_fn(metric_fn) - _track_metric_tensors(metric_name, metric_result, - stateful_metric_result) - - return metric_results - - def _handle_metrics(self, - outputs, - skip_target_indices=None, - targets=None, - sample_weights=None, - masks=None, - return_stateful_result=True): - """Handles calling metric functions. - - Arguments: - outputs: List of outputs (predictions). - skip_target_indices: Optional. List of target ids to skip. - targets: List of targets. - sample_weights: Optional list of sample weight arrays. - masks: List of computed output mask values. - return_stateful_result: Boolean, indicates whether the stateful - (aggregated)/stateless metric result should be returned. - - Returns: - A list of metric result tensors. - """ - skip_target_indices = skip_target_indices or [] - metric_results = [] - with K.name_scope('metrics'): - # Invoke all metrics added using `compile`. - for i in range(len(outputs)): - if i in skip_target_indices: - continue - output = outputs[i] if outputs else None - target = targets[i] if targets else None - output_mask = masks[i] if masks else None - metric_results.extend( - self._handle_per_output_metrics( - self._per_output_metrics[i], - target, - output, - output_mask, - return_stateful_result=return_stateful_result)) - metric_results.extend( - self._handle_per_output_metrics( - self._per_output_weighted_metrics[i], - target, - output, - output_mask, - weights=sample_weights[i], - return_stateful_result=return_stateful_result)) - - # Add metric results from the `add_metric` metrics in eager mode. - if context.executing_eagerly(): - for m in self.metrics: - if m not in self._compile_stateful_metric_functions: - metric_results.append(m.result()) - return metric_results - - @property - def run_eagerly(self): - """Settable attribute indicating whether the model should run eagerly. - - Running eagerly means that your model will be run step by step, - like Python code. Your model might run slower, but it should become easier - for you to debug it by stepping into individual layer calls. - - By default, we will attempt to compile your model to a static graph to - deliver the best execution performance. - - Returns: - Boolean, whether the model should run eagerly. - """ - if self._run_eagerly is True and not context.executing_eagerly(): - raise ValueError('You can only set `run_eagerly=True` if eager execution ' - 'is enabled.') - if self._static_graph_friendly: - if self._run_eagerly is None: - return False - else: - return self._run_eagerly - else: - if self._run_eagerly is False: - # TODO(fchollet): consider using py_func to enable this. - raise ValueError('Your model contains layers that can only be ' - 'successfully run in eager execution. ' - 'You cannot set `run_eagerly=False`.') - return context.executing_eagerly() - - @run_eagerly.setter - def run_eagerly(self, value): - self._run_eagerly = value - @checkpointable.no_automatic_dependency_tracking def compile(self, optimizer, @@ -814,733 +513,60 @@ class Model(Network): trainable_weights = self.trainable_weights self._collected_trainable_weights = trainable_weights - def _check_trainable_weights_consistency(self): - """Check trainable weights count consistency. + @property + def metrics(self): + """Returns the model's metrics added using `compile`, `add_metric` APIs.""" + metrics = [] + if self._is_compiled: + metrics += self._compile_stateful_metric_functions + return metrics + super(Model, self).metrics - This will raise a warning if `trainable_weights` and - `_collected_trainable_weights` are inconsistent (i.e. have different - number of parameters). - Inconsistency will typically arise when one modifies `model.trainable` - without calling `model.compile` again. - """ - if not hasattr(self, '_collected_trainable_weights'): - return + @property + def metrics_names(self): + """Returns the model's display labels for all outputs.""" + metrics_names = [] + if self._is_compiled: + metrics_names += self._compile_metrics_names # Includes names of losses. - if len(self.trainable_weights) != len(self._collected_trainable_weights): - logging.log_first_n( - logging.WARN, 'Discrepancy between trainable weights and collected' - ' trainable weights, did you set `model.trainable`' - ' without calling `model.compile` after ?', 1) + # Add metric names from layers. + for layer in self.layers: + metrics_names += [m.name for m in layer._metrics] # pylint: disable=protected-access + metrics_names += [m.name for m in self._metrics] + return metrics_names - def _make_train_function_helper(self, fn_name, outputs, metric_updates=None): - if not hasattr(self, fn_name): - raise RuntimeError('You must compile your model before using it.') - self._check_trainable_weights_consistency() - if getattr(self, fn_name) is None: - inputs = (self._feed_inputs + - self._feed_targets + - self._feed_sample_weights) - if not isinstance(K.symbolic_learning_phase(), int): - inputs += [K.symbolic_learning_phase()] + @property + def run_eagerly(self): + """Settable attribute indicating whether the model should run eagerly. - with K.get_graph().as_default(): - with K.name_scope('training'): - with K.name_scope(self.optimizer.__class__.__name__): - # Training updates - updates = self.optimizer.get_updates( - params=self._collected_trainable_weights, loss=self.total_loss) - # Unconditional updates - updates += self.get_updates_for(None) - # Conditional updates relevant to this model - updates += self.get_updates_for(self.inputs) - # Add stateful metrics updates. - if metric_updates is not None: - updates += metric_updates + Running eagerly means that your model will be run step by step, + like Python code. Your model might run slower, but it should become easier + for you to debug it by stepping into individual layer calls. - with K.name_scope('training'): - # Gets loss and metrics. Updates weights at each call. - fn = K.function( - inputs, - outputs, - updates=updates, - name='train_function', - **self._function_kwargs) - setattr(self, fn_name, fn) - - def _make_train_function(self): - metrics_tensors = [ - self._all_metrics_tensors[m] for m in self.metrics_names[1:] - ] - self._make_train_function_helper('train_function', - [self.total_loss] + metrics_tensors) - - def _make_fit_function(self): - metrics_tensors = [ - self._all_stateful_metrics_tensors[m] for m in self.metrics_names[1:] - ] - self._make_train_function_helper( - '_fit_function', [self.total_loss] + metrics_tensors) - - def _make_test_function_helper(self, fn_name, outputs, metric_updates=None): - if not hasattr(self, fn_name): - raise RuntimeError('You must compile your model before using it.') - if getattr(self, fn_name) is None: - inputs = (self._feed_inputs + - self._feed_targets + - self._feed_sample_weights) - - with K.name_scope('evaluation'): - updates = self.state_updates - # Add stateful metrics updates. - if metric_updates is not None: - updates += metric_updates - # Return loss and metrics, no gradient updates. - # Does update the network states. - fn = K.function( - inputs, - outputs, - updates=updates, - name='test_function', - **self._function_kwargs) - setattr(self, fn_name, fn) - - def _make_test_function(self): - metrics_tensors = [ - self._all_metrics_tensors[m] for m in self.metrics_names[1:] - ] - self._make_test_function_helper('test_function', - [self.total_loss] + metrics_tensors) - - def _make_eval_function(self): - metrics_tensors = [ - self._all_stateful_metrics_tensors[m] for m in self.metrics_names[1:] - ] - self._make_test_function_helper( - '_eval_function', [self.total_loss] + metrics_tensors) - - def _make_predict_function(self): - if not hasattr(self, 'predict_function'): - self.predict_function = None - if self.predict_function is None: - inputs = self._feed_inputs - # Gets network outputs. Does not update weights. - # Does update the network states. - kwargs = getattr(self, '_function_kwargs', {}) - with K.name_scope('predict'): - self.predict_function = K.function( - inputs, - self.outputs, - updates=self.state_updates, - name='predict_function', - **kwargs) - - def _make_execution_function(self, mode): - if mode == 'train': - self._make_fit_function() - return self._fit_function - if mode == 'test': - self._make_eval_function() - return self._eval_function - if mode == 'predict': - self._make_predict_function() - return self.predict_function - - def _get_iterator_get_next_tensors(self, iterator): - get_next_op = self._iterator_get_next.get(iterator, None) - if get_next_op is None: - get_next_op = iterator.get_next() - self._iterator_get_next[iterator] = get_next_op - return get_next_op - - def _distribution_standardize_user_data(self, - x, - y=None, - sample_weight=None, - class_weight=None, - batch_size=None, - check_steps=False, - steps_name='steps', - steps=None, - validation_split=0, - shuffle=False): - """Runs validation checks on input and target data passed by the user. - - This is called when using DistributionStrategy to train, evaluate or serve - the model. - - Args: - x: Input data. A numpy array or `tf.data` dataset. - y: Target data. A numpy array or None if x is a `tf.data` dataset. - sample_weight: An optional sample-weight array passed by the user to - weight the importance of each sample in `x`. - class_weight: An optional class-weight array by the user to - weight the importance of samples in `x` based on the class they belong - to, as conveyed by `y`. - batch_size: Integer batch size. If provided, it is used to run additional - validation checks on stateful models. - check_steps: boolean, True if we want to check for validity of `steps` and - False, otherwise. - steps_name: The public API's parameter name for `steps`. - steps: Integer or `None`. Total number of steps (batches of samples) to - execute. - validation_split: Float between 0 and 1. - Fraction of the training data to be used as validation data. - shuffle: Boolean whether to shuffle the training data before each epoch. + By default, we will attempt to compile your model to a static graph to + deliver the best execution performance. Returns: - Iterator for reading the dataset `x`. - - Raises: - ValueError: In case of invalid user-provided data. - RuntimeError: If the model was never compiled. + Boolean, whether the model should run eagerly. """ - if class_weight: - raise NotImplementedError('`class_weight` is currently not supported ' - 'when using DistributionStrategy.') - - if (sample_weight is not None and sample_weight.all() and - distributed_training_utils.is_tpu_strategy( - self._distribution_strategy)): - raise NotImplementedError('`sample_weight` is currently not supported ' - 'when using TPUStrategy.') - - # Validates `steps` argument right at the beginning since we use it to - # construct the dataset object. - # TODO(anjalisridhar): Remove this check once we refactor the - # _standardize_user_data code path. This check is already present elsewhere - # in the codebase. - if check_steps and isinstance(x, dataset_ops.DatasetV2) and steps is None: - raise ValueError('When using Datasets as input, ' - 'you should specify the `{steps_name}` argument.' - .format(steps_name=steps_name)) - - first_x_value = nest.flatten(x)[0] - if isinstance(first_x_value, np.ndarray): - # We need to use the drop_remainder argument to allow for a static - # input shape which is required for TPUs. - drop_remainder = self._distribution_strategy.require_static_shapes - if y is not None: - var_x = distributed_training_utils.get_var_for_numpy( - self._distribution_strategy, x) - var_y = distributed_training_utils.get_var_for_numpy( - self._distribution_strategy, y) - if sample_weight is not None: - var_sample_weights = distributed_training_utils.get_var_for_numpy( - self._distribution_strategy, sample_weight) - - x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y, - var_sample_weights)) - else: - x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y)) - - x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y)) - if shuffle: - # 1024 is a good buffer size since it is much larger than the average - # batch size provided by the user and provides sufficient randomness. - # One thing to keep in mind is the memory usage based on the size of - # each sample. - x = x.shuffle(1024) - x = x.repeat() - x = x.batch(batch_size, drop_remainder=drop_remainder) - y = None - sample_weight = None + if self._run_eagerly is True and not context.executing_eagerly(): + raise ValueError('You can only set `run_eagerly=True` if eager execution ' + 'is enabled.') + if self._static_graph_friendly: + if self._run_eagerly is None: + return False else: - # This case is for the predict call where the dataset only contains - # inputs and no targets, i.e. it does not return a tuple - var_x = distributed_training_utils.get_var_for_numpy( - self._distribution_strategy, x) - x = dataset_ops.Dataset.from_tensor_slices(var_x) - x = x.batch(batch_size, drop_remainder=drop_remainder) - - assert isinstance(x, dataset_ops.DatasetV2) - - with self._distribution_strategy.scope(): - iterator = self._distribution_strategy.make_dataset_iterator(x) - init_op = iterator.initialize() - if not context.executing_eagerly(): - K.get_session().run(init_op) - - training_utils.validate_iterator_input(x, y, sample_weight, - validation_split) - return iterator - - def _standardize_user_data(self, - x, - y=None, - sample_weight=None, - class_weight=None, - batch_size=None, - check_steps=False, - steps_name='steps', - steps=None, - validation_split=0, - shuffle=False): - """Runs validation checks on input and target data passed by the user. - - Also standardizes the data to lists of arrays, in order. - - Also builds and compiles the model on the fly if it is a subclassed model - that has never been called before (and thus has no inputs/outputs). - - This is a purely internal method, subject to refactoring at any time. - - Args: - x: Input data. It could be: - - A Numpy array (or array-like), or a list of arrays - (in case the model has multiple inputs). - - A TensorFlow tensor, or a list of tensors - (in case the model has multiple inputs). - - A dict mapping input names to the corresponding array/tensors, - if the model has named inputs. - - A `tf.data` dataset or a dataset iterator. - y: Target data. Like the input data `x`, - it could be either Numpy array(s) or TensorFlow tensor(s). - It should be consistent with `x` (you cannot have Numpy inputs and - tensor targets, or inversely). If `x` is a dataset or a - dataset iterator, `y` should not be specified - (since targets will be obtained from the iterator). - sample_weight: An optional sample-weight array passed by the user to - weight the importance of each sample in `x`. - class_weight: An optional class-weight array by the user to - weight the importance of samples in `x` based on the class they belong - to, as conveyed by `y`. - batch_size: Integer batch size. If provided, it is used to run additional - validation checks on stateful models. - check_steps: boolean, True if we want to check for validity of `steps` and - False, otherwise. For example, when we are standardizing one batch of - data for train_on_batch/predict_on_batch/test_on_batch APIs, `steps` - value is not required and we should not check for its validity in these - cases. - steps_name: The public API's parameter name for `steps`. - steps: Integer or `None`. Total number of steps (batches of samples) to - execute. - validation_split: Float between 0 and 1. - Fraction of the training data to be used as validation data. - shuffle: Boolean whether to shuffle the training data before each epoch. - - Returns: - A tuple of 3: inputs (arrays or dicts, depending on whether `x` was a dict - or not), target arrays, sample-weight arrays. - If the model's input and targets are symbolic, these lists are empty - (since the model takes no user-provided data, instead the data comes - from the symbolic inputs/targets). - - Raises: - ValueError: In case of invalid user-provided data. - RuntimeError: If the model was never compiled. - """ - if self._distribution_strategy: - iterator = self._distribution_standardize_user_data( - x, - y, - sample_weight=sample_weight, - class_weight=class_weight, - batch_size=batch_size, - check_steps=check_steps, - steps_name=steps_name, - steps=steps, - validation_split=validation_split, - shuffle=shuffle) - return iterator, None, None - - if isinstance(x, dataset_ops.DatasetV2): - if context.executing_eagerly(): - x = iter(x) - else: - if x in self._dataset_iterator_cache: - x = self._dataset_iterator_cache[x] - else: - iterator = dataset_ops.make_initializable_iterator(x) - self._dataset_iterator_cache[x] = iterator - x = iterator - K.get_session().run(x.initializer) - - # Validates `steps` argument based on x's type. - if check_steps: - training_utils.check_steps_argument(x, steps, steps_name) - - is_x_eager_iterator = isinstance(x, iterator_ops.EagerIterator) - is_x_iterator = isinstance(x, iterator_ops.Iterator) - - # Validate user inputs when data is given as a dataset or dataset iterator. - if is_x_iterator or is_x_eager_iterator: - training_utils.validate_iterator_input(x, y, sample_weight, - validation_split) - - # For eager iterators, when we have to process multiple batches of samples, - # we will standardize the data when we actually loop over iterator and get - # the batches. For now, we just return the iterator as is. - if is_x_eager_iterator: - return x, y, sample_weight - - # If input data is a dataset iterator in graph mode or if it is an eager - # iterator and only one batch of samples is required, we fetch the data - # tensors from the iterator and then standardize them. - if is_x_iterator or is_x_eager_iterator: - try: - if is_x_iterator: - next_element = self._get_iterator_get_next_tensors(x) - else: - next_element = x.get_next() - except errors.OutOfRangeError: - raise RuntimeError('Your dataset iterator ran out of data; ' - 'Make sure that your dataset can generate ' - 'required number of samples.') - - if isinstance(next_element, (list, tuple)): - if len(next_element) not in [2, 3]: - raise ValueError( - 'Please provide model inputs as a list or tuple of 2 or 3' - 'elements: (input, target) or (input, target, sample_weights)' - 'Received %s' % next_element) - if len(next_element) == 2: - x, y = next_element - else: - x, y, sample_weight = next_element - else: - x = next_element - x, y, sample_weights = self._standardize_weights( - x, y, sample_weight, class_weight, batch_size, is_x_iterator) - return x, y, sample_weights - - def _standardize_weights(self, - x, - y, - sample_weight=None, - class_weight=None, - batch_size=None, - from_iterator=False): - """Standardize input data, target data, and weight values. - - This method reformats all data passed to the model to an ordered list of - array/tensors, matching the order expected by the model. This also validates - the input and target data shapes. - - Args: - x: Input data. It could be: - - A Numpy array (or array-like), or a list of arrays - (in case the model has multiple inputs). - - A TensorFlow tensor, or a list of tensors - (in case the model has multiple inputs). - - A dict mapping input names to the corresponding array/tensors, - if the model has named inputs. - x cannot not be an iterator. - y: Target data. Like the input data `x`, - it could be either Numpy array(s) or TensorFlow tensor(s). - It should be consistent with `x` (you cannot have Numpy inputs and - tensor targets, or inversely). - sample_weight: An optional sample-weight array passed by the user to - weight the importance of each sample in `x`. - class_weight: An optional class-weight array by the user to - weight the importance of samples in `x` based on the class they belong - to, as conveyed by `y`. - batch_size: Integer batch size. If provided, it is used to run additional - validation checks on stateful models. - from_iterator: Whether x and y were obtained from an iterator. - - Returns: - Tuple of standardized data that will be fed to the model: - (input data, target data, sample weights) - - Raises: - RuntimeError: If target data is provided, but the model has not yet been - compiled. - ValueError: If the input data, target data, and batch size have invalid - shapes or formats (e.g. the model expects input to be a list of three - tensors, but x is a list with two tensors). Error is also raised if the - input and target data are not both arrays or tensors. - """ - # TODO(sourabhbajaj): Split input validation from weight standardization. - if sample_weight is not None and class_weight is not None: - logging.warning( - 'Received both a `sample_weight` and `class_weight` argument. ' - 'The `class_weight` argument will be ignored.') - # First, we build/compile the model on the fly if necessary. - all_inputs = [] - is_build_called = False - is_compile_called = False - # Whether this is a subclassed model that expects dictionary inputs - # rather than list inputs (e.g. FeatureColumn-based models). - dict_inputs = False - if not self.inputs: - # We need to use `x` to set the model inputs. - # We type-check that `x` and `y` are either single arrays - # or lists of arrays. - if isinstance(x, (list, tuple)): - if not all(isinstance(v, np.ndarray) or - tensor_util.is_tensor(v) for v in x): - raise ValueError('Please provide as model inputs either a single ' - 'array or a list of arrays. You passed: x=' + str(x)) - all_inputs += list(x) - elif isinstance(x, dict): - dict_inputs = True - keys = sorted(x.keys()) - all_inputs = [x[k] for k in keys] - else: - if not isinstance(x, np.ndarray) and not tensor_util.is_tensor(x): - raise ValueError('Please provide as model inputs either a single ' - 'array or a list of arrays. You passed: x=' + str(x)) - all_inputs.append(x) - - # Build the model using the retrieved inputs (value or symbolic). - # If values or generated from a dataset, then in symbolic-mode - # placeholders will be created to match the value shapes. - if not self.inputs: - is_build_called = True - if from_iterator: - cast_inputs = nest.map_structure(lambda v: v.shape, x) - elif training_utils.has_tensors(x): - cast_inputs = training_utils.cast_if_floating_dtype(x) - else: - cast_inputs = x - self._set_inputs(cast_inputs) + return self._run_eagerly else: - dict_inputs = isinstance(self.inputs, dict) - if dict_inputs and context.executing_eagerly(): - # No support for graph functions when the model expects dictionary inputs - # (i.e. FeatureColumn-based models). - self.run_eagerly = True + if self._run_eagerly is False: + # TODO(fchollet): consider using py_func to enable this. + raise ValueError('Your model contains layers that can only be ' + 'successfully run in eager execution. ' + 'You cannot set `run_eagerly=False`.') + return context.executing_eagerly() - if y is not None: - if not self.optimizer: - raise RuntimeError('You must compile a model before ' - 'training/testing. ' - 'Use `model.compile(optimizer, loss)`.') - if not self._is_compiled: - # On-the-fly compilation of the model. - # We need to use `y` to set the model targets. - if training_utils.has_tensors(y): - y = training_utils.cast_if_floating_dtype(y) - if isinstance(y, (list, tuple)): - if not all(isinstance(v, np.ndarray) or - tensor_util.is_tensor(v) for v in y): - raise ValueError('Please provide as model targets either a single ' - 'array or a list of arrays. ' - 'You passed: y=' + str(y)) - all_inputs += list(y) - elif isinstance(y, dict): - raise ValueError('Please do not pass a dictionary as model targets.') - else: - if not isinstance(y, np.ndarray) and not tensor_util.is_tensor(y): - raise ValueError('Please provide as model targets either a single ' - 'array or a list of arrays. ' - 'You passed: y=' + str(y)) - all_inputs.append(y) - - # Typecheck that all inputs are *either* value *or* symbolic. - # TODO(fchollet): this check could be removed in Eager mode? - if any(tensor_util.is_tensor(v) for v in all_inputs): - if not all(tensor_util.is_tensor(v) for v in all_inputs): - raise ValueError('Do not pass inputs that mix Numpy arrays and ' - 'TensorFlow tensors. ' - 'You passed: x=' + str(x) + '; y=' + str(y)) - - if self.run_eagerly or from_iterator: - target_tensors = None - else: - # Handle target tensors if any passed. - if not isinstance(y, (list, tuple)): - y = [y] - target_tensors = [v for v in y if _is_symbolic_tensor(v)] - is_compile_called = True - self.compile( - optimizer=self.optimizer, - loss=self.loss, - metrics=self._compile_metrics, - weighted_metrics=self._compile_weighted_metrics, - loss_weights=self.loss_weights, - target_tensors=target_tensors, - run_eagerly=self.run_eagerly) - - # In graph mode, if we had just set inputs and targets as symbolic tensors - # by invoking build and compile on the model respectively, we do not have to - # feed anything to the model. Model already has input and target data as - # part of the graph. - # Note: in this case, `any` and `all` are equivalent since we disallow - # mixed symbolic/value inputs. - if (not self.run_eagerly and is_build_called and is_compile_called and - not from_iterator and any(_is_symbolic_tensor(v) for v in all_inputs)): - return [], [], [] - - # What follows is input validation and standardization to list format, - # in the case where all inputs are value arrays. - - if self.run_eagerly: - # In eager mode, do not do shape validation - # since the network has no input nodes (placeholders) to be fed. - feed_input_names = self.input_names - feed_input_shapes = None - elif not self._is_graph_network: - # Case: symbolic-mode subclassed network. Do not do shape validation. - feed_input_names = self._feed_input_names - feed_input_shapes = None - else: - # Case: symbolic-mode graph network. - # In this case, we run extensive shape validation checks. - feed_input_names = self._feed_input_names - feed_input_shapes = self._feed_input_shapes - - # Standardize the inputs. - x = training_utils.standardize_input_data( - x, - feed_input_names, - feed_input_shapes, - check_batch_axis=False, # Don't enforce the batch size. - exception_prefix='input') - - if y is not None: - if not self._is_graph_network: - feed_output_names = self._feed_output_names - feed_output_shapes = None - # Sample weighting not supported in this case. - # TODO(fchollet): consider supporting it. - feed_sample_weight_modes = [None for _ in self.outputs] - else: - feed_output_names = self._feed_output_names - feed_sample_weight_modes = self._feed_sample_weight_modes - feed_output_shapes = [] - for output_shape, loss_fn in zip(self._feed_output_shapes, - self._feed_loss_fns): - if loss_fn is losses.sparse_categorical_crossentropy: - if K.image_data_format() == 'channels_first': - feed_output_shapes.append( - (output_shape[0], 1) + output_shape[2:]) - else: - feed_output_shapes.append(output_shape[:-1] + (1,)) - elif (not hasattr(loss_fn, '__name__') or - getattr(losses, loss_fn.__name__, None) is None): - # If `loss_fn` is not a function (e.g. callable class) - # or if it not in the `losses` module, then - # it is a user-defined loss and we make no assumptions - # about it. - feed_output_shapes.append(None) - else: - feed_output_shapes.append(output_shape) - - # Standardize the outputs. - y = training_utils.standardize_input_data( - y, - feed_output_names, - # Don't enforce target shapes to match output shapes. - # Precise checks will be run in `check_loss_and_target_compatibility`. - shapes=None, - check_batch_axis=False, # Don't enforce the batch size. - exception_prefix='target') - - # Generate sample-wise weight values given the `sample_weight` and - # `class_weight` arguments. - sample_weights = training_utils.standardize_sample_weights( - sample_weight, feed_output_names) - class_weights = training_utils.standardize_class_weights( - class_weight, feed_output_names) - sample_weights = [ - training_utils.standardize_weights(ref, sw, cw, mode) - for (ref, sw, cw, mode) in zip(y, sample_weights, class_weights, - feed_sample_weight_modes) - ] - # Check that all arrays have the same length. - if not self._distribution_strategy: - training_utils.check_array_lengths(x, y, sample_weights) - if self._is_graph_network and not self.run_eagerly: - # Additional checks to avoid users mistakenly using improper loss fns. - training_utils.check_loss_and_target_compatibility( - y, self._feed_loss_fns, feed_output_shapes) - else: - y = [] - sample_weights = [] - - if self.stateful and batch_size: - # Check that for stateful networks, number of samples is a multiple - # of the static batch size. - if x[0].shape[0] % batch_size != 0: - raise ValueError('In a stateful network, ' - 'you should only pass inputs with ' - 'a number of samples that can be ' - 'divided by the batch size. Found: ' + - str(x[0].shape[0]) + ' samples') - - # If dictionary inputs were provided, we return a dictionary as well. - if dict_inputs: - x = dict(zip(feed_input_names, x)) - return x, y, sample_weights - - @checkpointable.no_automatic_dependency_tracking - def _set_inputs(self, inputs, outputs=None, training=None): - """Set model's input and output specs based on the input data received. - - This is to be used for Model subclasses, which do not know at instantiation - time what their inputs look like. - - Args: - inputs: Single array, or list of arrays. The arrays could be placeholders, - Numpy arrays, data tensors, or TensorShapes. - - if placeholders: the model is built on top of these placeholders, - and we expect Numpy data to be fed for them when calling `fit`/etc. - - if Numpy data or TensorShapes: we create placeholders matching the - TensorShapes or shapes of the Numpy arrays. We expect Numpy data to be - fed for these placeholders when calling `fit`/etc. - - if data tensors: the model is built on top of these tensors. - We do not expect any Numpy data to be provided when calling `fit`/etc. - outputs: None, a data tensor, or a list of tensors. If None, the - outputs will be determined by invoking `self.call()`, otherwise the - provided value will be used. - training: Boolean or None. Only relevant in symbolic mode. Specifies - whether to build the model's graph in inference mode (False), training - mode (True), or using the Keras learning phase (None). - Raises: - ValueError: If dict inputs are passed to a Sequential Model where the - first layer isn't FeatureLayer. - """ - if self.inputs: - raise ValueError('Model inputs are already set.') - - if self.__class__.__name__ == 'Sequential' and not self.built: - if tensor_util.is_tensor(inputs): - input_shape = (None,) + tuple(inputs.shape.as_list()[1:]) - elif isinstance(inputs, tensor_shape.TensorShape): - input_shape = (None,) + tuple(inputs.as_list()[1:]) - elif isinstance(inputs, dict): - # We assert that the first layer is a FeatureLayer. - if not training_utils.is_feature_layer(self.layers[0]): - raise ValueError('Passing a dictionary input to a Sequential Model ' - 'which doesn\'t have FeatureLayer as the first layer' - ' is an error.') - input_shape = (None,) - else: - input_shape = (None,) + tuple(inputs.shape[1:]) - self._build_input_shape = input_shape - - # On-the-fly setting of symbolic model inputs (either by using the tensor - # provided, or by creating a placeholder if Numpy data was provided). - model_inputs = training_utils.ModelInputs(inputs) - inputs = model_inputs.get_symbolic_inputs() - self.inputs = model_inputs.get_symbolic_inputs(return_single_as_list=True) - self.input_names = model_inputs.get_input_names() - - self._feed_inputs = [] - self._feed_input_names = [] - self._feed_input_shapes = [] - - for k, v in model_inputs.as_dict(): - if K.is_placeholder(v): - self._feed_inputs.append(v) - self._feed_input_names.append(k) - self._feed_input_shapes.append(K.int_shape(v)) - - # TODO(fchollet): consider calling `_maybe_build` before calling the model. - - if outputs is None: - # Obtain symbolic outputs by calling the model. - with K.get_graph().as_default(): - if self._expects_training_arg: - outputs = self.call(inputs, training=training) - else: - outputs = self.call(inputs) - - outputs = nest.flatten(outputs) - self.outputs = outputs - self.output_names = training_utils.generic_output_names(outputs) - self.built = True + @run_eagerly.setter + def run_eagerly(self, value): + self._run_eagerly = value def fit(self, x=None, @@ -2583,6 +1609,929 @@ class Model(Network): def _default_save_signature(self): return training_utils.trace_model_call(self) + def _set_sample_weight_attributes(self, sample_weight_mode, + skip_target_weighing_indices): + """Sets sample weight related attributes on the model.""" + sample_weights, sample_weight_modes = training_utils.prepare_sample_weights( + self.output_names, sample_weight_mode, skip_target_weighing_indices) + self.sample_weights = sample_weights + self.sample_weight_modes = sample_weight_modes + self._feed_sample_weight_modes = [ + sample_weight_modes[i] + for i in range(len(self.outputs)) + if i not in skip_target_weighing_indices + ] + self._feed_sample_weights = [ + sample_weights[i] + for i in range(len(sample_weights)) + if i not in skip_target_weighing_indices + ] + + def _cache_output_metric_attributes(self, metrics, weighted_metrics): + """Caches metric name and function attributes for every model output.""" + output_shapes = [ + None if output is None else output.get_shape().as_list() + for output in self.outputs + ] + self._per_output_metrics = training_utils.collect_per_output_metric_info( + metrics, self.output_names, output_shapes, self.loss_functions) + self._per_output_weighted_metrics = \ + training_utils.collect_per_output_metric_info( + weighted_metrics, self.output_names, output_shapes, + self.loss_functions, self.sample_weights) + + def _add_unique_metric_name(self, metric_name, output_index): + """Makes the metric name unique and adds it to the model's metric name list. + + If there are multiple outputs for which the metrics are calculated, the + metric names have to be made unique by appending an integer. + + Arguments: + metric_name: Metric name that corresponds to the metric specified by the + user. For example: 'acc'. + output_index: The index of the model output for which the metric name is + being added. + + Returns: + string, name of the model's unique metric name + """ + if len(self.output_names) > 1: + metric_name = '%s_%s' % (self.output_names[output_index], metric_name) + j = 1 + base_metric_name = metric_name + while metric_name in self._compile_metrics_names: + metric_name = '%s_%d' % (base_metric_name, j) + j += 1 + + return metric_name + + @property + def _all_metrics_tensors(self): + """Returns the network's symbolic metric tensors.""" + metrics_tensors = {} + if self._is_compiled: + metrics_tensors.update(self._compile_metrics_tensors) + metrics_tensors.update(super(Model, self)._all_metrics_tensors) + return metrics_tensors + + @property + def _all_stateful_metrics_tensors(self): + """Returns the network's symbolic metric tensors.""" + metrics_tensors = {} + if self._is_compiled: + metrics_tensors.update(self._compile_stateful_metrics_tensors) + metrics_tensors.update(super(Model, self)._all_metrics_tensors) + return metrics_tensors + + def _init_metric_attributes(self): + """Initialized model metric attributes.""" + # List of all metric names in the model. + self._compile_metrics_names = ['loss'] + # List of stateful metric functions. Used for resetting metric state during + # training/eval. + # This includes loss functions when there are multiple outputs. + self._compile_stateful_metric_functions = [] + # Dict of all aggregated metric result tensors. This includes aggregated + # loss result tensors when there are multiple outputs. + self._compile_stateful_metrics_tensors = {} + # Dict of all metric result tensors (aggregated or not - based on the + # values given in compile.). This includes aggregated loss result tensors + # when there are multiple outputs. + self._compile_metrics_tensors = {} + + def _set_per_output_metric_attributes(self, metrics_dict, output_index): + """Sets the metric attributes on the model for the given output. + + Arguments: + metrics_dict: A dict with metric names as keys and metric fns as values. + output_index: The index of the model output for which the metric + attributes are added. + + Returns: + Metrics dict updated with unique metric names as keys. + """ + updated_metrics_dict = collections.OrderedDict() + for metric_name, (metric_fn, stateful_metric_fn) in metrics_dict.items(): + metric_name = self._add_unique_metric_name(metric_name, output_index) + updated_metrics_dict[metric_name] = (metric_fn, stateful_metric_fn) + # Keep track of metric name, function and stateful function. + self._compile_metrics_names.append(metric_name) + self._compile_stateful_metric_functions.append(stateful_metric_fn) + return updated_metrics_dict + + def _set_metric_attributes(self, outputs, skip_target_indices=None): + """Sets the metric attributes on the model for all the model outputs.""" + skip_target_indices = skip_target_indices or [] + updated_per_output_metrics = [] + updated_per_output_weighted_metrics = [] + for i in range(len(outputs)): + if i in skip_target_indices: + updated_per_output_metrics.append(self._per_output_metrics[i]) + updated_per_output_weighted_metrics.append( + self._per_output_weighted_metrics[i]) + continue + updated_per_output_metrics.append( + self._set_per_output_metric_attributes(self._per_output_metrics[i], + i)) + updated_per_output_weighted_metrics.append( + self._set_per_output_metric_attributes( + self._per_output_weighted_metrics[i], i)) + + self._per_output_metrics = updated_per_output_metrics + self._per_output_weighted_metrics = updated_per_output_weighted_metrics + + def _handle_per_output_metrics(self, + metrics_dict, + y_true, + y_pred, + mask, + weights=None, + return_stateful_result=True): + """Calls metric functions for a single output. + + Arguments: + metrics_dict: A dict with metric names as keys and metric fns as values. + y_true: Target output. + y_pred: Predicted output. + mask: Computed mask value for the current output. + weights: Weights to be applied on the current output. + return_stateful_result: Boolean, indicates whether the stateful + (aggregated)/stateless metric result should be returned. + + Returns: + A list of metric result tensors. + """ + metric_results = [] + for metric_name, (metric_fn, stateful_fn) in metrics_dict.items(): + with K.name_scope(metric_name): + + def _call_stateful_fn(fn): + return training_utils.call_metric_function( + fn, y_true, y_pred, weights=weights, mask=mask) + + def _call_stateless_fn(fn): + weighted_metric_fn = training_utils.weighted_masked_objective(fn) + return weighted_metric_fn(y_true, y_pred, weights=weights, mask=mask) + + def _track_metric_tensors(name, stateless_result, stateful_result): + self._compile_metrics_tensors[name] = stateless_result + self._compile_stateful_metrics_tensors[name] = stateful_result + + if isinstance(metric_fn, metrics_module.Metric): + # If the given metric fn is stateful, call the fn and return result. + metric_result = _call_stateful_fn(metric_fn) + metric_results.append(metric_result) + if not self.run_eagerly: + _track_metric_tensors(metric_name, metric_result, metric_result) + elif self.run_eagerly: + # In eager mode, if the given metric fn is not stateful, we invoke the + # given fn or its stateful version based on the given flag. + if return_stateful_result: + metric_result = _call_stateful_fn(stateful_fn) + else: + metric_result = _call_stateless_fn(metric_fn) + metric_results.append(metric_result) + else: + # In graph mode, we build the sub-graph for both the stateful and the + # stateless fns. + stateful_metric_result = _call_stateful_fn(stateful_fn) + metric_result = _call_stateless_fn(metric_fn) + _track_metric_tensors(metric_name, metric_result, + stateful_metric_result) + + return metric_results + + def _handle_metrics(self, + outputs, + skip_target_indices=None, + targets=None, + sample_weights=None, + masks=None, + return_stateful_result=True): + """Handles calling metric functions. + + Arguments: + outputs: List of outputs (predictions). + skip_target_indices: Optional. List of target ids to skip. + targets: List of targets. + sample_weights: Optional list of sample weight arrays. + masks: List of computed output mask values. + return_stateful_result: Boolean, indicates whether the stateful + (aggregated)/stateless metric result should be returned. + + Returns: + A list of metric result tensors. + """ + skip_target_indices = skip_target_indices or [] + metric_results = [] + with K.name_scope('metrics'): + # Invoke all metrics added using `compile`. + for i in range(len(outputs)): + if i in skip_target_indices: + continue + output = outputs[i] if outputs else None + target = targets[i] if targets else None + output_mask = masks[i] if masks else None + metric_results.extend( + self._handle_per_output_metrics( + self._per_output_metrics[i], + target, + output, + output_mask, + return_stateful_result=return_stateful_result)) + metric_results.extend( + self._handle_per_output_metrics( + self._per_output_weighted_metrics[i], + target, + output, + output_mask, + weights=sample_weights[i], + return_stateful_result=return_stateful_result)) + + # Add metric results from the `add_metric` metrics in eager mode. + if context.executing_eagerly(): + for m in self.metrics: + if m not in self._compile_stateful_metric_functions: + metric_results.append(m.result()) + return metric_results + + def _check_trainable_weights_consistency(self): + """Check trainable weights count consistency. + + This will raise a warning if `trainable_weights` and + `_collected_trainable_weights` are inconsistent (i.e. have different + number of parameters). + Inconsistency will typically arise when one modifies `model.trainable` + without calling `model.compile` again. + """ + if not hasattr(self, '_collected_trainable_weights'): + return + + if len(self.trainable_weights) != len(self._collected_trainable_weights): + logging.log_first_n( + logging.WARN, 'Discrepancy between trainable weights and collected' + ' trainable weights, did you set `model.trainable`' + ' without calling `model.compile` after ?', 1) + + def _make_train_function_helper(self, fn_name, outputs, metric_updates=None): + if not hasattr(self, fn_name): + raise RuntimeError('You must compile your model before using it.') + self._check_trainable_weights_consistency() + if getattr(self, fn_name) is None: + inputs = (self._feed_inputs + + self._feed_targets + + self._feed_sample_weights) + if not isinstance(K.symbolic_learning_phase(), int): + inputs += [K.symbolic_learning_phase()] + + with K.get_graph().as_default(): + with K.name_scope('training'): + with K.name_scope(self.optimizer.__class__.__name__): + # Training updates + updates = self.optimizer.get_updates( + params=self._collected_trainable_weights, loss=self.total_loss) + # Unconditional updates + updates += self.get_updates_for(None) + # Conditional updates relevant to this model + updates += self.get_updates_for(self.inputs) + # Add stateful metrics updates. + if metric_updates is not None: + updates += metric_updates + + with K.name_scope('training'): + # Gets loss and metrics. Updates weights at each call. + fn = K.function( + inputs, + outputs, + updates=updates, + name='train_function', + **self._function_kwargs) + setattr(self, fn_name, fn) + + def _make_train_function(self): + metrics_tensors = [ + self._all_metrics_tensors[m] for m in self.metrics_names[1:] + ] + self._make_train_function_helper('train_function', + [self.total_loss] + metrics_tensors) + + def _make_fit_function(self): + metrics_tensors = [ + self._all_stateful_metrics_tensors[m] for m in self.metrics_names[1:] + ] + self._make_train_function_helper( + '_fit_function', [self.total_loss] + metrics_tensors) + + def _make_test_function_helper(self, fn_name, outputs, metric_updates=None): + if not hasattr(self, fn_name): + raise RuntimeError('You must compile your model before using it.') + if getattr(self, fn_name) is None: + inputs = (self._feed_inputs + + self._feed_targets + + self._feed_sample_weights) + + with K.name_scope('evaluation'): + updates = self.state_updates + # Add stateful metrics updates. + if metric_updates is not None: + updates += metric_updates + # Return loss and metrics, no gradient updates. + # Does update the network states. + fn = K.function( + inputs, + outputs, + updates=updates, + name='test_function', + **self._function_kwargs) + setattr(self, fn_name, fn) + + def _make_test_function(self): + metrics_tensors = [ + self._all_metrics_tensors[m] for m in self.metrics_names[1:] + ] + self._make_test_function_helper('test_function', + [self.total_loss] + metrics_tensors) + + def _make_eval_function(self): + metrics_tensors = [ + self._all_stateful_metrics_tensors[m] for m in self.metrics_names[1:] + ] + self._make_test_function_helper( + '_eval_function', [self.total_loss] + metrics_tensors) + + def _make_predict_function(self): + if not hasattr(self, 'predict_function'): + self.predict_function = None + if self.predict_function is None: + inputs = self._feed_inputs + # Gets network outputs. Does not update weights. + # Does update the network states. + kwargs = getattr(self, '_function_kwargs', {}) + with K.name_scope('predict'): + self.predict_function = K.function( + inputs, + self.outputs, + updates=self.state_updates, + name='predict_function', + **kwargs) + + def _make_execution_function(self, mode): + if mode == 'train': + self._make_fit_function() + return self._fit_function + if mode == 'test': + self._make_eval_function() + return self._eval_function + if mode == 'predict': + self._make_predict_function() + return self.predict_function + + def _get_iterator_get_next_tensors(self, iterator): + get_next_op = self._iterator_get_next.get(iterator, None) + if get_next_op is None: + get_next_op = iterator.get_next() + self._iterator_get_next[iterator] = get_next_op + return get_next_op + + def _distribution_standardize_user_data(self, + x, + y=None, + sample_weight=None, + class_weight=None, + batch_size=None, + check_steps=False, + steps_name='steps', + steps=None, + validation_split=0, + shuffle=False): + """Runs validation checks on input and target data passed by the user. + + This is called when using DistributionStrategy to train, evaluate or serve + the model. + + Args: + x: Input data. A numpy array or `tf.data` dataset. + y: Target data. A numpy array or None if x is a `tf.data` dataset. + sample_weight: An optional sample-weight array passed by the user to + weight the importance of each sample in `x`. + class_weight: An optional class-weight array by the user to + weight the importance of samples in `x` based on the class they belong + to, as conveyed by `y`. + batch_size: Integer batch size. If provided, it is used to run additional + validation checks on stateful models. + check_steps: boolean, True if we want to check for validity of `steps` and + False, otherwise. + steps_name: The public API's parameter name for `steps`. + steps: Integer or `None`. Total number of steps (batches of samples) to + execute. + validation_split: Float between 0 and 1. + Fraction of the training data to be used as validation data. + shuffle: Boolean whether to shuffle the training data before each epoch. + + Returns: + Iterator for reading the dataset `x`. + + Raises: + ValueError: In case of invalid user-provided data. + RuntimeError: If the model was never compiled. + """ + if class_weight: + raise NotImplementedError('`class_weight` is currently not supported ' + 'when using DistributionStrategy.') + + if (sample_weight is not None and sample_weight.all() and + distributed_training_utils.is_tpu_strategy( + self._distribution_strategy)): + raise NotImplementedError('`sample_weight` is currently not supported ' + 'when using TPUStrategy.') + + # Validates `steps` argument right at the beginning since we use it to + # construct the dataset object. + # TODO(anjalisridhar): Remove this check once we refactor the + # _standardize_user_data code path. This check is already present elsewhere + # in the codebase. + if check_steps and isinstance(x, dataset_ops.DatasetV2) and steps is None: + raise ValueError('When using Datasets as input, ' + 'you should specify the `{steps_name}` argument.' + .format(steps_name=steps_name)) + + first_x_value = nest.flatten(x)[0] + if isinstance(first_x_value, np.ndarray): + # We need to use the drop_remainder argument to allow for a static + # input shape which is required for TPUs. + drop_remainder = self._distribution_strategy.require_static_shapes + if y is not None: + var_x = distributed_training_utils.get_var_for_numpy( + self._distribution_strategy, x) + var_y = distributed_training_utils.get_var_for_numpy( + self._distribution_strategy, y) + if sample_weight is not None: + var_sample_weights = distributed_training_utils.get_var_for_numpy( + self._distribution_strategy, sample_weight) + + x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y, + var_sample_weights)) + else: + x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y)) + + x = dataset_ops.Dataset.from_tensor_slices((var_x, var_y)) + if shuffle: + # 1024 is a good buffer size since it is much larger than the average + # batch size provided by the user and provides sufficient randomness. + # One thing to keep in mind is the memory usage based on the size of + # each sample. + x = x.shuffle(1024) + x = x.repeat() + x = x.batch(batch_size, drop_remainder=drop_remainder) + y = None + sample_weight = None + else: + # This case is for the predict call where the dataset only contains + # inputs and no targets, i.e. it does not return a tuple + var_x = distributed_training_utils.get_var_for_numpy( + self._distribution_strategy, x) + x = dataset_ops.Dataset.from_tensor_slices(var_x) + x = x.batch(batch_size, drop_remainder=drop_remainder) + + assert isinstance(x, dataset_ops.DatasetV2) + + with self._distribution_strategy.scope(): + iterator = self._distribution_strategy.make_dataset_iterator(x) + init_op = iterator.initialize() + if not context.executing_eagerly(): + K.get_session().run(init_op) + + training_utils.validate_iterator_input(x, y, sample_weight, + validation_split) + return iterator + + def _standardize_user_data(self, + x, + y=None, + sample_weight=None, + class_weight=None, + batch_size=None, + check_steps=False, + steps_name='steps', + steps=None, + validation_split=0, + shuffle=False): + """Runs validation checks on input and target data passed by the user. + + Also standardizes the data to lists of arrays, in order. + + Also builds and compiles the model on the fly if it is a subclassed model + that has never been called before (and thus has no inputs/outputs). + + This is a purely internal method, subject to refactoring at any time. + + Args: + x: Input data. It could be: + - A Numpy array (or array-like), or a list of arrays + (in case the model has multiple inputs). + - A TensorFlow tensor, or a list of tensors + (in case the model has multiple inputs). + - A dict mapping input names to the corresponding array/tensors, + if the model has named inputs. + - A `tf.data` dataset or a dataset iterator. + y: Target data. Like the input data `x`, + it could be either Numpy array(s) or TensorFlow tensor(s). + It should be consistent with `x` (you cannot have Numpy inputs and + tensor targets, or inversely). If `x` is a dataset or a + dataset iterator, `y` should not be specified + (since targets will be obtained from the iterator). + sample_weight: An optional sample-weight array passed by the user to + weight the importance of each sample in `x`. + class_weight: An optional class-weight array by the user to + weight the importance of samples in `x` based on the class they belong + to, as conveyed by `y`. + batch_size: Integer batch size. If provided, it is used to run additional + validation checks on stateful models. + check_steps: boolean, True if we want to check for validity of `steps` and + False, otherwise. For example, when we are standardizing one batch of + data for train_on_batch/predict_on_batch/test_on_batch APIs, `steps` + value is not required and we should not check for its validity in these + cases. + steps_name: The public API's parameter name for `steps`. + steps: Integer or `None`. Total number of steps (batches of samples) to + execute. + validation_split: Float between 0 and 1. + Fraction of the training data to be used as validation data. + shuffle: Boolean whether to shuffle the training data before each epoch. + + Returns: + A tuple of 3: inputs (arrays or dicts, depending on whether `x` was a dict + or not), target arrays, sample-weight arrays. + If the model's input and targets are symbolic, these lists are empty + (since the model takes no user-provided data, instead the data comes + from the symbolic inputs/targets). + + Raises: + ValueError: In case of invalid user-provided data. + RuntimeError: If the model was never compiled. + """ + if self._distribution_strategy: + iterator = self._distribution_standardize_user_data( + x, + y, + sample_weight=sample_weight, + class_weight=class_weight, + batch_size=batch_size, + check_steps=check_steps, + steps_name=steps_name, + steps=steps, + validation_split=validation_split, + shuffle=shuffle) + return iterator, None, None + + if isinstance(x, dataset_ops.DatasetV2): + if context.executing_eagerly(): + x = iter(x) + else: + if x in self._dataset_iterator_cache: + x = self._dataset_iterator_cache[x] + else: + iterator = dataset_ops.make_initializable_iterator(x) + self._dataset_iterator_cache[x] = iterator + x = iterator + K.get_session().run(x.initializer) + + # Validates `steps` argument based on x's type. + if check_steps: + training_utils.check_steps_argument(x, steps, steps_name) + + is_x_eager_iterator = isinstance(x, iterator_ops.EagerIterator) + is_x_iterator = isinstance(x, iterator_ops.Iterator) + + # Validate user inputs when data is given as a dataset or dataset iterator. + if is_x_iterator or is_x_eager_iterator: + training_utils.validate_iterator_input(x, y, sample_weight, + validation_split) + + # For eager iterators, when we have to process multiple batches of samples, + # we will standardize the data when we actually loop over iterator and get + # the batches. For now, we just return the iterator as is. + if is_x_eager_iterator: + return x, y, sample_weight + + # If input data is a dataset iterator in graph mode or if it is an eager + # iterator and only one batch of samples is required, we fetch the data + # tensors from the iterator and then standardize them. + if is_x_iterator or is_x_eager_iterator: + try: + if is_x_iterator: + next_element = self._get_iterator_get_next_tensors(x) + else: + next_element = x.get_next() + except errors.OutOfRangeError: + raise RuntimeError('Your dataset iterator ran out of data; ' + 'Make sure that your dataset can generate ' + 'required number of samples.') + + if isinstance(next_element, (list, tuple)): + if len(next_element) not in [2, 3]: + raise ValueError( + 'Please provide model inputs as a list or tuple of 2 or 3' + 'elements: (input, target) or (input, target, sample_weights)' + 'Received %s' % next_element) + if len(next_element) == 2: + x, y = next_element + else: + x, y, sample_weight = next_element + else: + x = next_element + + if sample_weight is not None and class_weight is not None: + logging.warning( + 'Received both a `sample_weight` and `class_weight` argument. ' + 'The `class_weight` argument will be ignored.') + # First, we build/compile the model on the fly if necessary. + all_inputs = [] + is_build_called = False + is_compile_called = False + # Whether this is a subclassed model that expects dictionary inputs + # rather than list inputs (e.g. FeatureColumn-based models). + dict_inputs = False + if not self.inputs: + # We need to use `x` to set the model inputs. + # We type-check that `x` and `y` are either single arrays + # or lists of arrays. + if isinstance(x, (list, tuple)): + if not all(isinstance(v, np.ndarray) or + tensor_util.is_tensor(v) for v in x): + raise ValueError('Please provide as model inputs either a single ' + 'array or a list of arrays. You passed: x=' + str(x)) + all_inputs += list(x) + elif isinstance(x, dict): + dict_inputs = True + keys = sorted(x.keys()) + all_inputs = [x[k] for k in keys] + else: + if not isinstance(x, np.ndarray) and not tensor_util.is_tensor(x): + raise ValueError('Please provide as model inputs either a single ' + 'array or a list of arrays. You passed: x=' + str(x)) + all_inputs.append(x) + + # Build the model using the retrieved inputs (value or symbolic). + # If values or generated from a dataset, then in symbolic-mode + # placeholders will be created to match the value shapes. + if not self.inputs: + is_build_called = True + if is_x_iterator: + cast_inputs = nest.map_structure(lambda v: v.shape, x) + elif training_utils.has_tensors(x): + cast_inputs = training_utils.cast_if_floating_dtype(x) + else: + cast_inputs = x + self._set_inputs(cast_inputs) + else: + dict_inputs = isinstance(self.inputs, dict) + if dict_inputs and context.executing_eagerly(): + # No support for graph functions when the model expects dictionary inputs + # (i.e. FeatureColumn-based models). + self.run_eagerly = True + + if y is not None: + if not self.optimizer: + raise RuntimeError('You must compile a model before ' + 'training/testing. ' + 'Use `model.compile(optimizer, loss)`.') + if not self._is_compiled: + # On-the-fly compilation of the model. + # We need to use `y` to set the model targets. + if training_utils.has_tensors(y): + y = training_utils.cast_if_floating_dtype(y) + if isinstance(y, (list, tuple)): + if not all(isinstance(v, np.ndarray) or + tensor_util.is_tensor(v) for v in y): + raise ValueError('Please provide as model targets either a single ' + 'array or a list of arrays. ' + 'You passed: y=' + str(y)) + all_inputs += list(y) + elif isinstance(y, dict): + raise ValueError('Please do not pass a dictionary as model targets.') + else: + if not isinstance(y, np.ndarray) and not tensor_util.is_tensor(y): + raise ValueError('Please provide as model targets either a single ' + 'array or a list of arrays. ' + 'You passed: y=' + str(y)) + all_inputs.append(y) + + # Typecheck that all inputs are *either* value *or* symbolic. + # TODO(fchollet): this check could be removed in Eager mode? + if any(tensor_util.is_tensor(v) for v in all_inputs): + if not all(tensor_util.is_tensor(v) for v in all_inputs): + raise ValueError('Do not pass inputs that mix Numpy arrays and ' + 'TensorFlow tensors. ' + 'You passed: x=' + str(x) + '; y=' + str(y)) + + if self.run_eagerly or is_x_iterator: + target_tensors = None + else: + # Handle target tensors if any passed. + if not isinstance(y, (list, tuple)): + y = [y] + target_tensors = [v for v in y if _is_symbolic_tensor(v)] + is_compile_called = True + self.compile( + optimizer=self.optimizer, + loss=self.loss, + metrics=self._compile_metrics, + weighted_metrics=self._compile_weighted_metrics, + loss_weights=self.loss_weights, + target_tensors=target_tensors, + run_eagerly=self.run_eagerly) + + # In graph mode, if we had just set inputs and targets as symbolic tensors + # by invoking build and compile on the model respectively, we do not have to + # feed anything to the model. Model already has input and target data as + # part of the graph. + # Note: in this case, `any` and `all` are equivalent since we disallow + # mixed symbolic/value inputs. + if (not self.run_eagerly and is_build_called and is_compile_called and + not is_x_iterator and any(_is_symbolic_tensor(v) for v in all_inputs)): + return [], [], [] + + # What follows is input validation and standardization to list format, + # in the case where all inputs are value arrays. + + if self.run_eagerly: + # In eager mode, do not do shape validation + # since the network has no input nodes (placeholders) to be fed. + feed_input_names = self.input_names + feed_input_shapes = None + elif not self._is_graph_network: + # Case: symbolic-mode subclassed network. Do not do shape validation. + feed_input_names = self._feed_input_names + feed_input_shapes = None + else: + # Case: symbolic-mode graph network. + # In this case, we run extensive shape validation checks. + feed_input_names = self._feed_input_names + feed_input_shapes = self._feed_input_shapes + + # Standardize the inputs. + x = training_utils.standardize_input_data( + x, + feed_input_names, + feed_input_shapes, + check_batch_axis=False, # Don't enforce the batch size. + exception_prefix='input') + + if y is not None: + if not self._is_graph_network: + feed_output_names = self._feed_output_names + feed_output_shapes = None + # Sample weighting not supported in this case. + # TODO(fchollet): consider supporting it. + feed_sample_weight_modes = [None for _ in self.outputs] + else: + feed_output_names = self._feed_output_names + feed_sample_weight_modes = self._feed_sample_weight_modes + feed_output_shapes = [] + for output_shape, loss_fn in zip(self._feed_output_shapes, + self._feed_loss_fns): + if loss_fn is losses.sparse_categorical_crossentropy: + if K.image_data_format() == 'channels_first': + feed_output_shapes.append( + (output_shape[0], 1) + output_shape[2:]) + else: + feed_output_shapes.append(output_shape[:-1] + (1,)) + elif (not hasattr(loss_fn, '__name__') or + getattr(losses, loss_fn.__name__, None) is None): + # If `loss_fn` is not a function (e.g. callable class) + # or if it not in the `losses` module, then + # it is a user-defined loss and we make no assumptions + # about it. + feed_output_shapes.append(None) + else: + feed_output_shapes.append(output_shape) + + # Standardize the outputs. + y = training_utils.standardize_input_data( + y, + feed_output_names, + # Don't enforce target shapes to match output shapes. + # Precise checks will be run in `check_loss_and_target_compatibility`. + shapes=None, + check_batch_axis=False, # Don't enforce the batch size. + exception_prefix='target') + + # Generate sample-wise weight values given the `sample_weight` and + # `class_weight` arguments. + sample_weights = training_utils.standardize_sample_weights( + sample_weight, feed_output_names) + class_weights = training_utils.standardize_class_weights( + class_weight, feed_output_names) + sample_weights = [ + training_utils.standardize_weights(ref, sw, cw, mode) + for (ref, sw, cw, mode) in zip(y, sample_weights, class_weights, + feed_sample_weight_modes) + ] + # Check that all arrays have the same length. + if not self._distribution_strategy: + training_utils.check_array_lengths(x, y, sample_weights) + if self._is_graph_network and not self.run_eagerly: + # Additional checks to avoid users mistakenly using improper loss fns. + training_utils.check_loss_and_target_compatibility( + y, self._feed_loss_fns, feed_output_shapes) + else: + y = [] + sample_weights = [] + + if self.stateful and batch_size: + # Check that for stateful networks, number of samples is a multiple + # of the static batch size. + if x[0].shape[0] % batch_size != 0: + raise ValueError('In a stateful network, ' + 'you should only pass inputs with ' + 'a number of samples that can be ' + 'divided by the batch size. Found: ' + + str(x[0].shape[0]) + ' samples') + + # If dictionary inputs were provided, we return a dictionary as well. + if dict_inputs: + x = dict(zip(feed_input_names, x)) + return x, y, sample_weights + + @checkpointable.no_automatic_dependency_tracking + def _set_inputs(self, inputs, outputs=None, training=None): + """Set model's input and output specs based on the input data received. + + This is to be used for Model subclasses, which do not know at instantiation + time what their inputs look like. + + Args: + inputs: Single array, or list of arrays. The arrays could be placeholders, + Numpy arrays, data tensors, or TensorShapes. + - if placeholders: the model is built on top of these placeholders, + and we expect Numpy data to be fed for them when calling `fit`/etc. + - if Numpy data or TensorShapes: we create placeholders matching the + TensorShapes or shapes of the Numpy arrays. We expect Numpy data to be + fed for these placeholders when calling `fit`/etc. + - if data tensors: the model is built on top of these tensors. + We do not expect any Numpy data to be provided when calling `fit`/etc. + outputs: None, a data tensor, or a list of tensors. If None, the + outputs will be determined by invoking `self.call()`, otherwise the + provided value will be used. + training: Boolean or None. Only relevant in symbolic mode. Specifies + whether to build the model's graph in inference mode (False), training + mode (True), or using the Keras learning phase (None). + Raises: + ValueError: If dict inputs are passed to a Sequential Model where the + first layer isn't FeatureLayer. + """ + if self.inputs: + raise ValueError('Model inputs are already set.') + + if self.__class__.__name__ == 'Sequential' and not self.built: + if tensor_util.is_tensor(inputs): + input_shape = (None,) + tuple(inputs.shape.as_list()[1:]) + elif isinstance(inputs, tensor_shape.TensorShape): + input_shape = (None,) + tuple(inputs.as_list()[1:]) + elif isinstance(inputs, dict): + # We assert that the first layer is a FeatureLayer. + if not training_utils.is_feature_layer(self.layers[0]): + raise ValueError('Passing a dictionary input to a Sequential Model ' + 'which doesn\'t have FeatureLayer as the first layer' + ' is an error.') + input_shape = (None,) + else: + input_shape = (None,) + tuple(inputs.shape[1:]) + self._build_input_shape = input_shape + + # On-the-fly setting of symbolic model inputs (either by using the tensor + # provided, or by creating a placeholder if Numpy data was provided). + model_inputs = training_utils.ModelInputs(inputs) + inputs = model_inputs.get_symbolic_inputs() + self.inputs = model_inputs.get_symbolic_inputs(return_single_as_list=True) + self.input_names = model_inputs.get_input_names() + + self._feed_inputs = [] + self._feed_input_names = [] + self._feed_input_shapes = [] + + for k, v in model_inputs.as_dict(): + if K.is_placeholder(v): + self._feed_inputs.append(v) + self._feed_input_names.append(k) + self._feed_input_shapes.append(K.int_shape(v)) + + # TODO(fchollet): consider calling `_maybe_build` before calling the model. + + if outputs is None: + # Obtain symbolic outputs by calling the model. + with K.get_graph().as_default(): + if self._expects_training_arg: + outputs = self.call(inputs, training=training) + else: + outputs = self.call(inputs) + + outputs = nest.flatten(outputs) + self.outputs = outputs + self.output_names = training_utils.generic_output_names(outputs) + self.built = True + class DistributedCallbackModel(Model): """Model that is used for callbacks with DistributionStrategy.""" diff --git a/tensorflow/python/keras/engine/training_distributed.py b/tensorflow/python/keras/engine/training_distributed.py index d20d092d8e6..ffb0266911e 100644 --- a/tensorflow/python/keras/engine/training_distributed.py +++ b/tensorflow/python/keras/engine/training_distributed.py @@ -570,11 +570,8 @@ def _get_input_from_iterator(iterator, model): # Validate that all the elements in x and y are of the same type and shape. # We can then pass the first element of x and y to `_standardize_weights` # below and be confident of the output. - x_values, y_values, sample_weights_values = distributed_training_utils.\ - validate_distributed_dataset_inputs(model._distribution_strategy, x, y, - sample_weights) - model._standardize_weights(x_values, y_values, - sample_weight=sample_weights_values) + distributed_training_utils.validate_distributed_dataset_inputs( + model._distribution_strategy, x, y, sample_weights) return x, y, sample_weights