PSv2: Docstring minor rephrase and typo/example corrections.

PiperOrigin-RevId: 339929158
Change-Id: I8592aa6e2cec32a2ba97743a6f022f263f0f65e2
This commit is contained in:
Rick Chao 2020-10-30 13:25:03 -07:00 committed by TensorFlower Gardener
parent 60ac36f504
commit 84384703c0
2 changed files with 42 additions and 43 deletions

View File

@ -291,11 +291,11 @@ class PerWorkerValues(object):
"""A container that holds a list of values, one value per worker. """A container that holds a list of values, one value per worker.
`tf.distribute.experimental.coordinator.PerWorkerValues` contains a collection `tf.distribute.experimental.coordinator.PerWorkerValues` contains a collection
of values, where each of the value is located one worker respectively, and of values, where each of the values is located on its corresponding worker,
upon being used as one of the `args` or `kwargs` of and upon being used as one of the `args` or `kwargs` of
`tf.distribute.experimental.coordinator.ClusterCoordinator.schedule()`, the `tf.distribute.experimental.coordinator.ClusterCoordinator.schedule()`, the
value specific to a worker will be passed into the function being executed at value specific to a worker will be passed into the function being executed at
that particular worker. that corresponding worker.
Currently, the only supported path to create an object of Currently, the only supported path to create an object of
`tf.distribute.experimental.coordinator.PerWorkerValues` is through calling `tf.distribute.experimental.coordinator.PerWorkerValues` is through calling
@ -948,14 +948,13 @@ class ClusterCoordinator(object):
failed worker, it will be added for function execution after datasets created failed worker, it will be added for function execution after datasets created
by `create_per_worker_dataset` are re-built on it. by `create_per_worker_dataset` are re-built on it.
When a parameter server the coordinator fails, a When a parameter server fails, a `tf.errors.UnavailableError` is raised by
`tf.errors.UnavailableError` is raised by `schedule`, `join` or `done`. In `schedule`, `join` or `done`. In this case, in addition to bringing back the
this case, in addition to bringing back the failed parameter server, users failed parameter server, users should restart the coordinator so that it
should restart the coordinator to so that it reconnects to the parameter reconnects to workers and parameter servers, re-creates the variables, and
server, re-creates the variables and loads checkpoints. If the coordinator loads checkpoints. If the coordinator fails, after the user brings it back,
fails, users need to bring it back as well. The program will automatically the program will automatically connect to workers and parameter servers, and
connect to the parameter servers and workers, and continue the progress from a continue the progress from a checkpoint.
checkpoint.
It is thus essential that in user's program, a checkpoint file is periodically It is thus essential that in user's program, a checkpoint file is periodically
saved, and restored at the start of the program. If an saved, and restored at the start of the program. If an
@ -1137,7 +1136,7 @@ class ClusterCoordinator(object):
def per_worker_dataset_fn(): def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function( return strategy.distribute_datasets_from_function(
lambda x: tf.data.from_tensor_slices([3] * 3) lambda x: tf.data.Dataset.from_tensor_slices([3] * 3))
per_worker_dataset = coordinator.create_per_worker_dataset( per_worker_dataset = coordinator.create_per_worker_dataset(
per_worker_dataset_fn) per_worker_dataset_fn)

View File

@ -52,22 +52,22 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
synchronizing with each other. Under this configuration, it is known as synchronizing with each other. Under this configuration, it is known as
asynchronous training. asynchronous training.
In TensorFlow 2, we recommend a central coordiantion-based architecture for In TensorFlow 2, we recommend an architecture based on central coordination
parameter server training, where workers and parameter servers run a for parameter server training. Each worker and parameter server runs a
`tf.distribute.Server` and there is another task that creates resources on `tf.distribute.Server`, and on top of that, a coordinator task is responsible
workers and parameter servers, dispatches functions, and coordinates the for creating resources on workers and parameter servers, dispatching
training. We refer to this task as coordinator. The coordinator uses a functions, and coordinating the training. The coordinator uses a
`tf.distribute.experimental.coordinator.ClusterCoordinator` to coordinate the `tf.distribute.experimental.coordinator.ClusterCoordinator` to coordinate the
cluster, and a `tf.distribute.experimental.ParameterServerStrategy` to define cluster, and a `tf.distribute.experimental.ParameterServerStrategy` to define
variables on parameter servers and computation on workers. variables on parameter servers and computation on workers.
For the training to work, the coordinator dispatches `tf.function`s to be For the training to work, the coordinator dispatches `tf.function`s to be
executed on remote workers. Upon receiving requests from executed on remote workers. Upon receiving requests from the coordinator, a
the coordinator, a worker executes the `tf.function` by reading the variables worker executes the `tf.function` by reading the variables from parameter
from parameter servers, executing the ops, and updating the variables on the servers, executing the ops, and updating the variables on the parameter
parameter servers. Each of the worker only processes the requests from the servers. Each of the worker only processes the requests from the coordinator,
coordinator, and communicates with parameter servers, without direct and communicates with parameter servers, without direct interactions with
interactions with other workers in the cluster. other workers in the cluster.
As a result, failures of some workers do not prevent the cluster from As a result, failures of some workers do not prevent the cluster from
continuing the work, and this allows the cluster to train with instances that continuing the work, and this allows the cluster to train with instances that
@ -77,7 +77,7 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
Note that the coordinator is not one of the training workers. Instead, it Note that the coordinator is not one of the training workers. Instead, it
creates resources such as variables and datasets, dispatchs `tf.function`s, creates resources such as variables and datasets, dispatchs `tf.function`s,
saving checkpoints and so on. In addition to workers, parameter servers and saves checkpoints and so on. In addition to workers, parameter servers and
the coordinator, an optional evaluator can be run on the side that the coordinator, an optional evaluator can be run on the side that
periodically reads the checkpoints saved by the coordinator and runs periodically reads the checkpoints saved by the coordinator and runs
evaluations against each checkpoint. evaluations against each checkpoint.
@ -226,8 +226,8 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
``` ```
Alternatively, you can also start a bunch of TensorFlow servers in advance and Alternatively, you can also start a bunch of TensorFlow servers in advance and
connect to them later. The coordinator can be in the same cluster or on any connect to them later. The coordinator can be in the same cluster or on any
machine that has connectivity to workers and parameter server. This is covered machine that has connectivity to workers and parameter servers. This is
in our guide and tutorial. covered in our guide and tutorial.
__Variable creation with `strategy.scope()`__ __Variable creation with `strategy.scope()`__
@ -270,9 +270,9 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
"shard" the variables across the ps. Partitioning large variable among ps is a "shard" the variables across the ps. Partitioning large variable among ps is a
commonly used technique to boost training throughput and mitigate memory commonly used technique to boost training throughput and mitigate memory
constraints. It enables parallel computations and updates on different shards constraints. It enables parallel computations and updates on different shards
of a variable, and often yields better load balancing across parameter servers of a variable, and often yields better load balancing across parameter
. Without sharding, models with large variables (e.g, embeddings) that can't servers. Without sharding, models with large variables (e.g, embeddings) that
fit into one machine's memory would otherwise be unable to train. can't fit into one machine's memory would otherwise be unable to train.
With `tf.distribute.experimental.ParameterServerStrategy`, if a With `tf.distribute.experimental.ParameterServerStrategy`, if a
`variable_partitioner` is provided to `__init__` and certain conditions are `variable_partitioner` is provided to `__init__` and certain conditions are
@ -294,40 +294,41 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
return x * self.w return x * self.w
# Partition the dense layer into 2 shards. # Partition the dense layer into 2 shards.
variable_partitioiner = ( variable_partitioner = (
tf.distribute.experimental.partitioners.FixedShardsPartitioner( tf.distribute.experimental.partitioners.FixedShardsPartitioner(
num_shards = 2)) num_shards = 2))
strategy = ParameterServerStrategy(cluster_resolver=..., strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver=...,
variable_partitioner = variable_partitioner) variable_partitioner = variable_partitioner)
with strategy.scope(): with strategy.scope():
dense = Dense() dense = Dense()
assert len(dense.variables) == 2 assert len(dense.variables) == 2
assert isinstance(dense.variables[0], tf.Variable) assert isinstance(dense.variables[0], tf.Variable)
assert isinstance(dense.variables[1], tf.Variable) assert isinstance(dense.variables[1], tf.Variable)
assert dense.variables[0].name == "w/part_0" assert dense.variables[0].shape == (50, 10)
assert dense.variables[1].name == "w/part_1" assert dense.variables[1].shape == (50, 10)
``` ```
The sharded variable container can be converted to a `Tensor` via The sharded variable container can be converted to a `Tensor` via
`tf.convert_to_tensor`. This means the container can be directly used in most `tf.convert_to_tensor`. This means the container can be directly used in most
Python Ops where such `Tensor` convertion automatically happens. For example Python Ops where such `Tensor` conversion automatically happens. For example,
in the above code snippet, `x * self.w` would implicitly apply the said tensor in the above code snippet, `x * self.w` would implicitly apply the said tensor
convertion. Note that such convertion can be expensive, as the variable conversion. Note that such conversion can be expensive, as the variable
components need to be transferred from multiple parameter servers to where components need to be transferred from multiple parameter servers to where
the value is used. the value is used.
`tf.nn.embedding_lookup` on the other hand doesn't apply the tensor convertion `tf.nn.embedding_lookup` on the other hand doesn't apply the tensor
, and performs parallel lookups on the variable components instead. This is conversion, and performs parallel lookups on the variable components instead.
crutial to scale up embedding lookups when the embedding table variable is This is crucial to scale up embedding lookups when the embedding table
large. variable is large.
When a partitioned variable is saved to `SavedModel`, it will be saved as if When a partitioned variable is saved to a `SavedModel`, it will be saved as if
it is one single variable. This improves serving efficiency by eliminating it is one single variable. This improves serving efficiency by eliminating
a number of Ops that handle the partiton aspects. a number of Ops that handle the partiton aspects.
Known limitations of variable partitioning: Known limitations of variable partitioning:
* Number of parttions must not change across Checkpoint save/load. * Number of partitions must not change across Checkpoint saving/loading.
* After saving partitioned variables to a SavedModel, the SavedModel can't be * After saving partitioned variables to a SavedModel, the SavedModel can't be
loaded via `tf.saved_model.load`. loaded via `tf.saved_model.load`.
@ -358,7 +359,6 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
coordinator = coordinator =
tf.distribute.experimental.coordinator.ClusterCoordinator(strategy=...) tf.distribute.experimental.coordinator.ClusterCoordinator(strategy=...)
distributed_dataset = coordinator.create_per_worker_dataset(dataset_fn) distributed_dataset = coordinator.create_per_worker_dataset(dataset_fn)
``` ```
__Limitations__ __Limitations__
@ -404,7 +404,7 @@ class ParameterServerStrategyV2(distribute_lib.Strategy):
* `variable_partitioner` will be called for each variable created under * `variable_partitioner` will be called for each variable created under
strategy `scope` to instruct how the variable should be partitioned. strategy `scope` to instruct how the variable should be partitioned.
Variables that have only one partition along the partitioning axis Variables that have only one partition along the partitioning axis
(i.e., no need for partition) will be created as normal `tf.Variable`. (i.e., no need for partition) will be created as a normal `tf.Variable`.
* Only the first / outermost axis partitioning is supported. * Only the first / outermost axis partitioning is supported.