diff --git a/tensorflow/python/distribute/parameter_server_strategy_v2.py b/tensorflow/python/distribute/parameter_server_strategy_v2.py index 7cb246623da..61e2e730603 100644 --- a/tensorflow/python/distribute/parameter_server_strategy_v2.py +++ b/tensorflow/python/distribute/parameter_server_strategy_v2.py @@ -264,51 +264,75 @@ class ParameterServerStrategyV2(distribute_lib.Strategy): __Variable partitioning__ Having dedicated servers to store variables means being able to divide up, or - "shard" the variables across the ps. Large embeddings that would otherwise - exceed memory limit of a single machine can be used in a cluster with enough - number of ps. + "shard" the variables across the ps. Partitioning large variable among ps is a + commonly used technique to boost training throughput and mitigate memory + constraints. It enables parallel computations and updates on different shards + of a variable, and often yields better load balancing across parameter servers + . Without sharding, models with large variables (e.g, embeddings) that can't + fit into one machine's memory would otherwise be unable to train. With `tf.distribute.experimental.ParameterServerStrategy`, if a `variable_partitioner` is provided to `__init__` and certain conditions are satisfied, the resulting variables created in scope are sharded across the parameter servers, in a round-robin fashion. The variable reference returned from `tf.Variable` becomes a type that serves as the container of the sharded - variables. Access `variables` attribute of this container for the actual - variable components. See arguments section of - `tf.distribute.experimental.ParameterServerStrategy.__init__` for more - information. + variables. One can access `variables` attribute of this container for the + actual variable components. If building model with `tf.Module` or Keras, + the variable components are collected in the `variables` alike attributes. - To initialize the sharded variables in a more memory-efficient way, use an - initializer whose `__call__` accepts a `shard_info` argument, and use - `shard_info.offset` and `shard_info.shape` to create and return a - partition-aware `tf.Tensor` to initialize the variable components. ```python - class PartitionAwareIdentity(object): + class Dense(tf.Module): + def __init__(self, name=None): + super().__init__(name=name) + self.w = tf.Variable(tf.random.normal([100, 10]), name='w') - def __call__(self, shape, dtype, shard_info): - value = tf.eye(*shape, dtype=dtype) - if shard_info is not None: - value = tf.slice(value, shard_info.offset, shard_info.shape) - return value + def __call__(self, x): + return x * self.w - cluster_resolver = ... - strategy = tf.distribute.experimental.ParameterServerStrategy( - cluster_resolver, tf.fixed_size_partitioner(2)) + # Partition the dense layer into 2 shards. + variable_partitioiner = ( + tf.distribute.experimental.partitioners.FixedShardsPartitioner( + num_shards = 2)) + strategy = ParameterServerStrategy(cluster_resolver=..., + variable_partitioner = variable_partitioner) with strategy.scope(): - initializer = PartitionAwareIdentity() - initial_value = functools.partial(initializer, shape=(4, 4), dtype=tf.int64) - v = tf.Variable( - initial_value=initial_value, shape=(4, 4), dtype=tf.int64) - - # `v.variables` gives the actual variable components. - assert len(v.variables) == 2 - assert v.variables[0].device == "/job:ps/replica:0/task:0/device:CPU:0" - assert v.variables[1].device == "/job:ps/replica:0/task:1/device:CPU:0" - assert np.array_equal(v.variables[0].numpy(), [[1, 0, 0, 0], [0, 1, 0, 0]]) - assert np.array_equal(v.variables[1].numpy(), [[0, 0, 1, 0], [0, 0, 0, 1]]) + dense = Dense() + assert len(dense.variables) == 2 + assert isinstance(dense.variables[0], tf.Variable) + assert isinstance(dense.variables[1], tf.Variable) + assert dense.variables[0].name == "w/part_0" + assert dense.variables[1].name == "w/part_1" ``` + The sharded variable container can be converted to a `Tensor` via + `tf.convert_to_tensor`. This means the container can be directly used in most + Python Ops where such `Tensor` convertion automatically happens. For example + in the above code snippet, `x * self.w` would implicitly apply the said tensor + convertion. Note that such convertion can be expensive, as the variable + components need to be transferred from multiple parameter servers to where + the value is used. + + `tf.nn.embedding_lookup` on the other hand doesn't apply the tensor convertion + , and performs parallel lookups on the variable components instead. This is + crutial to scale up embedding lookups when the embedding table variable is + large. + + When a partitioned variable is saved to `SavedModel`, it will be saved as if + it is one single variable. This improves serving efficiency by eliminating + a number of Ops that handle the partiton aspects. + + Known limitations of variable partitioning: + + * Number of parttions must not change across Checkpoint save/load. + + * After saving partitioned variables to a SavedModel, the SavedModel can't be + loaded via `tf.saved_model.load`. + + * Partition variable doesn't directly work with `tf.GradientTape`, please use + the `variables` attributes to get the actual variable components and use + them in gradient APIs instead. + __Dataset preparation__ With `tf.distribute.experimental.ParameterServerStrategy`, a dataset is @@ -367,37 +391,34 @@ class ParameterServerStrategyV2(distribute_lib.Strategy): cluster_resolver: a `tf.distribute.cluster_resolver.ClusterResolver` object. variable_partitioner: - a callable with the signature `num_partitions = fn(shape, dtype)`, where - `num_partitions` is a list/tuple representing the number of partitions - on each axis, and `shape` and `dtype` are of types `tf.TensorShape` and - `tf.dtypes.Dtype`. If `None`, variables will not be partitioned. + a `distribute.experimental.partitioners.Partitioner` that specifies + how to partition variables. If `None`, variables will not be + partitioned. - * `variable_partitioner` will be called for all variables created under - strategy `scope` to instruct how the variables should be partitioned. - Variables will be created in multiple partitions if there are more than - one partition along the partitioning axis, otherwise it falls back to - normal `tf.Variable`. + * Predefined partitioners in `tf.distribute.experimental.partitioners` + can be used for this argument. A commonly used partitioner is + `MinSizePartitioner(min_shard_bytes = 256 << 10, max_shards = num_ps)`, + which allocates at least 256K per shard, and each ps gets at most one + shard. - * Only the first / outermost axis partitioning is supported, namely, - elements in `num_partitions` must be 1 other than the first element. + * `variable_partitioner` will be called for each variable created under + strategy `scope` to instruct how the variable should be partitioned. + Variables that have only one partition along the partitioning axis + (i.e., no need for partition) will be created as normal `tf.Variable`. - * Partitioner like `tf.compat.v1.min_max_variable_partitioner`, - `tf.compat.v1.variable_axis_size_partitioner` and - `tf.compat.v1.fixed_size_partitioner` are also supported since they - conform to the required signature. + * Only the first / outermost axis partitioning is supported. - * Div partition - strategy is used to partition variables. Assuming we assign consecutive - integer ids along the first axis of a variable, then ids are assigned to - shards in a contiguous manner, while attempting to keep each shard size - identical. If the ids do not evenly divide the number of shards, each of - the first several shards will be assigned one more id. For instance, a - variable whose first dimension is 13 has 13 ids, and they are split - across 5 shards as: + * Div partition strategy is used to partition variables. Assuming we + assign consecutive integer ids along the first axis of a variable, then + ids are assigned to shards in a contiguous manner, while attempting to + keep each shard size identical. If the ids do not evenly divide the + number of shards, each of the first several shards will be assigned one + more id. For instance, a variable whose first dimension is 13 has 13 + ids, and they are split across 5 shards as: `[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10], [11, 12]]`. * Variables created under `strategy.extended.colocate_vars_with` will - not be partitioned, e.g, optimizer's slot variables. + not be partitioned. """ # pyformat: enable self._cluster_resolver = cluster_resolver