Docstring fixes for cluster resolvers.

PiperOrigin-RevId: 315426402
Change-Id: I9a8982af6a2fe0538f9af3812572db55b29525b8
This commit is contained in:
Yuefeng Zhou 2020-06-08 23:06:23 -07:00 committed by TensorFlower Gardener
parent 33014a38d9
commit d9e5e2f7b3
4 changed files with 154 additions and 11 deletions

View File

@ -63,7 +63,8 @@ class ClusterResolver(object):
This defines the skeleton for all implementations of ClusterResolvers. This defines the skeleton for all implementations of ClusterResolvers.
ClusterResolvers are a way for TensorFlow to communicate with various cluster ClusterResolvers are a way for TensorFlow to communicate with various cluster
management systems (e.g. GCE, AWS, etc...). management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary
information to set up distributed training.
By letting TensorFlow communicate with these systems, we will be able to By letting TensorFlow communicate with these systems, we will be able to
automatically discover and resolve IP addresses for various TensorFlow automatically discover and resolve IP addresses for various TensorFlow
@ -73,7 +74,8 @@ class ClusterResolver(object):
Note to Implementors: In addition to these abstract methods, you must also Note to Implementors: In addition to these abstract methods, you must also
implement the task_type, task_id, and rpc_layer attributes. You may choose implement the task_type, task_id, and rpc_layer attributes. You may choose
to implement them either as properties with getters or setters or directly to implement them either as properties with getters or setters or directly
set the attributes. set the attributes. The task_type and task_id attributes are required by
`tf.distribute.experimental.MultiWorkerMirroredStrategy`.
- task_type is the name of the server's current named job (e.g. 'worker', - task_type is the name of the server's current named job (e.g. 'worker',
'ps' in a distributed parameterized training job). 'ps' in a distributed parameterized training job).
@ -84,11 +86,11 @@ class ClusterResolver(object):
@abc.abstractmethod @abc.abstractmethod
def cluster_spec(self): def cluster_spec(self):
"""Retrieve the current state of the cluster and return a ClusterSpec. """Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`.
Returns: Returns:
A ClusterSpec representing the state of the cluster at the moment this A `tf.train.ClusterSpec` representing the state of the cluster at the
function is called. moment this function is called.
Implementors of this function must take care in ensuring that the Implementors of this function must take care in ensuring that the
ClusterSpec returned is up-to-date at the time of calling this function. ClusterSpec returned is up-to-date at the time of calling this function.
@ -102,6 +104,8 @@ class ClusterResolver(object):
def master(self, task_type=None, task_id=None, rpc_layer=None): def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Retrieves the name or URL of the session master. """Retrieves the name or URL of the session master.
Note: this is only useful for TensorFlow 1.x.
Args: Args:
task_type: (Optional) The type of the TensorFlow task of the master. task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master.
@ -126,7 +130,7 @@ class ClusterResolver(object):
available per worker. available per worker.
Optionally, we allow callers to specify the task_type, and task_id, for Optionally, we allow callers to specify the task_type, and task_id, for
if they want to target a specific TensorFlow process to query if they want to target a specific TensorFlow task to query
the number of accelerators. This is to support heterogenous environments, the number of accelerators. This is to support heterogenous environments,
where the number of accelerators cores per host is different. where the number of accelerators cores per host is different.
@ -142,6 +146,8 @@ class ClusterResolver(object):
A map of accelerator types to number of cores. A map of accelerator types to number of cores.
""" """
master = self.master(task_type, task_id) master = self.master(task_type, task_id)
# TODO(b/126786766): in eager mode, we should check whether
# `tf.config.experimental_connect_to_cluster` is called or not.
devices = get_accelerator_devices(master, config_proto) devices = get_accelerator_devices(master, config_proto)
mapping = collections.defaultdict(int) mapping = collections.defaultdict(int)
for device in devices: for device in devices:
@ -174,7 +180,35 @@ class ClusterResolver(object):
@tf_export('distribute.cluster_resolver.SimpleClusterResolver') @tf_export('distribute.cluster_resolver.SimpleClusterResolver')
class SimpleClusterResolver(ClusterResolver): class SimpleClusterResolver(ClusterResolver):
"""Simple implementation of ClusterResolver that accepts a ClusterSpec.""" """Simple implementation of ClusterResolver that accepts all attributes.
Please see the base class for documentation of arguments of its constructor.
It is useful if you want to specify some or all attributes.
Usage example with `tf.distribute.Strategy`:
```Python
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
# On worker 0
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
# On worker 1
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=1,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
```
"""
def __init__(self, cluster_spec, master='', task_type=None, task_id=None, def __init__(self, cluster_spec, master='', task_type=None, task_id=None,
environment='', num_accelerators=None, environment='', num_accelerators=None,
@ -190,7 +224,7 @@ class SimpleClusterResolver(ClusterResolver):
self._rpc_layer = rpc_layer self._rpc_layer = rpc_layer
if not isinstance(cluster_spec, ClusterSpec): if not isinstance(cluster_spec, ClusterSpec):
raise TypeError('cluster_spec must be a ClusterSpec.') raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.')
self._cluster_spec = cluster_spec self._cluster_spec = cluster_spec
if not isinstance(master, str): if not isinstance(master, str):
@ -204,6 +238,8 @@ class SimpleClusterResolver(ClusterResolver):
def master(self, task_type=None, task_id=None, rpc_layer=None): def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a session. """Returns the master address to use when creating a session.
Note: this is only useful for TensorFlow 1.x.
Args: Args:
task_type: (Optional) The type of the TensorFlow task of the master. task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master.
@ -249,9 +285,8 @@ class SimpleClusterResolver(ClusterResolver):
"""Returns the number of accelerator cores per worker. """Returns the number of accelerator cores per worker.
The SimpleClusterResolver does not do automatic detection of accelerators, The SimpleClusterResolver does not do automatic detection of accelerators,
so a TensorFlow session will never be created, and thus all arguments are and thus all arguments are unused and we simply return the value provided
unused and we simply assume that the type of accelerator is a GPU and return in the constructor.
the value in provided to us in the constructor.
Args: Args:
task_type: Unused. task_type: Unused.
@ -285,6 +320,36 @@ class UnionClusterResolver(ClusterResolver):
For additional ClusterResolver properties such as task type, task index, For additional ClusterResolver properties such as task type, task index,
rpc layer, environment, etc..., we will return the value from the first rpc layer, environment, etc..., we will return the value from the first
ClusterResolver in the union. ClusterResolver in the union.
An example to combine two cluster resolvers:
```Python
cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
rpc_layer="grpc")
cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222",
"ps1.example.com:2222"]})
cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps",
task_id=0,
rpc_layer="grpc")
# Its task type would be "worker".
cluster_resolver = UnionClusterResolver(cluster_resolver_0,
cluster_resolver_1)
```
An example to override the number of GPUs in a TFConfigClusterResolver
instance:
```Python
tf_config = TFConfigClusterResolver()
gpu_override = SimpleClusterResolver(tf_config.cluster_spec(),
num_accelerators={"GPU": 1})
cluster_resolver = UnionResolver(gpu_override, tf_config)
```
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -400,6 +465,8 @@ class UnionClusterResolver(ClusterResolver):
This usually returns the master from the first ClusterResolver passed in, This usually returns the master from the first ClusterResolver passed in,
but you can override this by specifying the task_type and task_id. but you can override this by specifying the task_type and task_id.
Note: this is only useful for TensorFlow 1.x.
Args: Args:
task_type: (Optional) The type of the TensorFlow task of the master. task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master.

View File

@ -40,6 +40,29 @@ class GCEClusterResolver(ClusterResolver):
this will retrieve the IP address of all the instances within the instance this will retrieve the IP address of all the instances within the instance
group and return a ClusterResolver object suitable for use for distributed group and return a ClusterResolver object suitable for use for distributed
TensorFlow. TensorFlow.
Note: this cluster resolver cannot retrieve `task_type`, `task_id` or
`rpc_layer`. To use it with some distribution strategies like
`tf.distribute.experimental.MultiWorkerMirroredStrategy`, you will need to
specify `task_type` and `task_id` in the constructor.
Usage example with tf.distribute.Strategy:
```Python
# On worker 0
cluster_resolver = GCEClusterResolver("my-project", "us-west1",
"my-instance-group",
task_type="worker", task_id=0)
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
# On worker 1
cluster_resolver = GCEClusterResolver("my-project", "us-west1",
"my-instance-group",
task_type="worker", task_id=1)
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
```
""" """
def __init__(self, def __init__(self,

View File

@ -39,6 +39,31 @@ class KubernetesClusterResolver(ClusterResolver):
the Kubernetes namespace and label selector for pods, we will retrieve the the Kubernetes namespace and label selector for pods, we will retrieve the
pod IP addresses of all running pods matching the selector, and return a pod IP addresses of all running pods matching the selector, and return a
ClusterSpec based on that information. ClusterSpec based on that information.
Note: it cannot retrieve `task_type`, `task_id` or `rpc_layer`. To use it
with some distribution strategies like
`tf.distribute.experimental.MultiWorkerMirroredStrategy`, you will need to
specify `task_type` and `task_id` by setting these attributes.
Usage example with tf.distribute.Strategy:
```Python
# On worker 0
cluster_resolver = KubernetesClusterResolver(
{"worker": ["job-name=worker-cluster-a", "job-name=worker-cluster-b"]})
cluster_resolver.task_type = "worker"
cluster_resolver.task_id = 0
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
# On worker 1
cluster_resolver = KubernetesClusterResolver(
{"worker": ["job-name=worker-cluster-a", "job-name=worker-cluster-b"]})
cluster_resolver.task_type = "worker"
cluster_resolver.task_id = 1
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
```
""" """
def __init__(self, def __init__(self,
@ -101,6 +126,8 @@ class KubernetesClusterResolver(ClusterResolver):
parameters when using this function. If you do both, the function parameters parameters when using this function. If you do both, the function parameters
will override the object properties. will override the object properties.
Note: this is only useful for TensorFlow 1.x.
Args: Args:
task_type: (Optional) The type of the TensorFlow task of the master. task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master.

View File

@ -55,6 +55,30 @@ class TFConfigClusterResolver(ClusterResolver):
This is an implementation of cluster resolvers when using TF_CONFIG to set This is an implementation of cluster resolvers when using TF_CONFIG to set
information about the cluster. The cluster spec returned will be information about the cluster. The cluster spec returned will be
initialized from the TF_CONFIG environment variable. initialized from the TF_CONFIG environment variable.
An example to set TF_CONFIG is:
```Python
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
```
However, sometimes the container orchestration framework will set TF_CONFIG
for you. In this case, you can just create an instance without passing in any
arguments. You can find an example here to let Kuburnetes set TF_CONFIG for
you: https://github.com/tensorflow/ecosystem/tree/master/kubernetes. Then you
can use it with `tf.distribute.Strategy` as:
```Python
# `TFConfigClusterResolver` is already the default one in the following
# strategy.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=TFConfigClusterResolver())
```
""" """
def __init__(self, def __init__(self,
@ -140,6 +164,8 @@ class TFConfigClusterResolver(ClusterResolver):
def master(self, task_type=None, task_id=None, rpc_layer=None): def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a TensorFlow session. """Returns the master address to use when creating a TensorFlow session.
Note: this is only useful for TensorFlow 1.x.
Args: Args:
task_type: (String, optional) Overrides and sets the task_type of the task_type: (String, optional) Overrides and sets the task_type of the
master. master.