From d9e5e2f7b34f803c11d0b7a6e9c3100e6d139d74 Mon Sep 17 00:00:00 2001 From: Yuefeng Zhou Date: Mon, 8 Jun 2020 23:06:23 -0700 Subject: [PATCH] Docstring fixes for cluster resolvers. PiperOrigin-RevId: 315426402 Change-Id: I9a8982af6a2fe0538f9af3812572db55b29525b8 --- .../cluster_resolver/cluster_resolver.py | 89 ++++++++++++++++--- .../cluster_resolver/gce_cluster_resolver.py | 23 +++++ .../kubernetes_cluster_resolver.py | 27 ++++++ .../tfconfig_cluster_resolver.py | 26 ++++++ 4 files changed, 154 insertions(+), 11 deletions(-) diff --git a/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py b/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py index e15b6ab01f8..a8babc21af6 100644 --- a/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py +++ b/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py @@ -63,7 +63,8 @@ class ClusterResolver(object): This defines the skeleton for all implementations of ClusterResolvers. ClusterResolvers are a way for TensorFlow to communicate with various cluster - management systems (e.g. GCE, AWS, etc...). + management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary + information to set up distributed training. By letting TensorFlow communicate with these systems, we will be able to automatically discover and resolve IP addresses for various TensorFlow @@ -73,7 +74,8 @@ class ClusterResolver(object): Note to Implementors: In addition to these abstract methods, you must also implement the task_type, task_id, and rpc_layer attributes. You may choose to implement them either as properties with getters or setters or directly - set the attributes. + set the attributes. The task_type and task_id attributes are required by + `tf.distribute.experimental.MultiWorkerMirroredStrategy`. - task_type is the name of the server's current named job (e.g. 'worker', 'ps' in a distributed parameterized training job). @@ -84,11 +86,11 @@ class ClusterResolver(object): @abc.abstractmethod def cluster_spec(self): - """Retrieve the current state of the cluster and return a ClusterSpec. + """Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`. Returns: - A ClusterSpec representing the state of the cluster at the moment this - function is called. + A `tf.train.ClusterSpec` representing the state of the cluster at the + moment this function is called. Implementors of this function must take care in ensuring that the ClusterSpec returned is up-to-date at the time of calling this function. @@ -102,6 +104,8 @@ class ClusterResolver(object): def master(self, task_type=None, task_id=None, rpc_layer=None): """Retrieves the name or URL of the session master. + Note: this is only useful for TensorFlow 1.x. + Args: task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. @@ -126,7 +130,7 @@ class ClusterResolver(object): available per worker. Optionally, we allow callers to specify the task_type, and task_id, for - if they want to target a specific TensorFlow process to query + if they want to target a specific TensorFlow task to query the number of accelerators. This is to support heterogenous environments, where the number of accelerators cores per host is different. @@ -142,6 +146,8 @@ class ClusterResolver(object): A map of accelerator types to number of cores. """ master = self.master(task_type, task_id) + # TODO(b/126786766): in eager mode, we should check whether + # `tf.config.experimental_connect_to_cluster` is called or not. devices = get_accelerator_devices(master, config_proto) mapping = collections.defaultdict(int) for device in devices: @@ -174,7 +180,35 @@ class ClusterResolver(object): @tf_export('distribute.cluster_resolver.SimpleClusterResolver') class SimpleClusterResolver(ClusterResolver): - """Simple implementation of ClusterResolver that accepts a ClusterSpec.""" + """Simple implementation of ClusterResolver that accepts all attributes. + + Please see the base class for documentation of arguments of its constructor. + + It is useful if you want to specify some or all attributes. + + Usage example with `tf.distribute.Strategy`: + + ```Python + cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", + "worker1.example.com:2222"]}) + + # On worker 0 + cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", + task_id=0, + num_accelerators={"GPU": 8}, + rpc_layer="grpc") + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + + # On worker 1 + cluster_resolver = SimpleClusterResolver(cluster, task_type="worker", + task_id=1, + num_accelerators={"GPU": 8}, + rpc_layer="grpc") + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + ``` + """ def __init__(self, cluster_spec, master='', task_type=None, task_id=None, environment='', num_accelerators=None, @@ -190,7 +224,7 @@ class SimpleClusterResolver(ClusterResolver): self._rpc_layer = rpc_layer if not isinstance(cluster_spec, ClusterSpec): - raise TypeError('cluster_spec must be a ClusterSpec.') + raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.') self._cluster_spec = cluster_spec if not isinstance(master, str): @@ -204,6 +238,8 @@ class SimpleClusterResolver(ClusterResolver): def master(self, task_type=None, task_id=None, rpc_layer=None): """Returns the master address to use when creating a session. + Note: this is only useful for TensorFlow 1.x. + Args: task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. @@ -249,9 +285,8 @@ class SimpleClusterResolver(ClusterResolver): """Returns the number of accelerator cores per worker. The SimpleClusterResolver does not do automatic detection of accelerators, - so a TensorFlow session will never be created, and thus all arguments are - unused and we simply assume that the type of accelerator is a GPU and return - the value in provided to us in the constructor. + and thus all arguments are unused and we simply return the value provided + in the constructor. Args: task_type: Unused. @@ -285,6 +320,36 @@ class UnionClusterResolver(ClusterResolver): For additional ClusterResolver properties such as task type, task index, rpc layer, environment, etc..., we will return the value from the first ClusterResolver in the union. + + An example to combine two cluster resolvers: + + ```Python + cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222", + "worker1.example.com:2222"]}) + cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker", + task_id=0, + rpc_layer="grpc") + + cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222", + "ps1.example.com:2222"]}) + cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps", + task_id=0, + rpc_layer="grpc") + + # Its task type would be "worker". + cluster_resolver = UnionClusterResolver(cluster_resolver_0, + cluster_resolver_1) + ``` + + An example to override the number of GPUs in a TFConfigClusterResolver + instance: + + ```Python + tf_config = TFConfigClusterResolver() + gpu_override = SimpleClusterResolver(tf_config.cluster_spec(), + num_accelerators={"GPU": 1}) + cluster_resolver = UnionResolver(gpu_override, tf_config) + ``` """ def __init__(self, *args, **kwargs): @@ -400,6 +465,8 @@ class UnionClusterResolver(ClusterResolver): This usually returns the master from the first ClusterResolver passed in, but you can override this by specifying the task_type and task_id. + Note: this is only useful for TensorFlow 1.x. + Args: task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. diff --git a/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver.py b/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver.py index 70d42e80a70..14548ed4350 100644 --- a/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver.py +++ b/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver.py @@ -40,6 +40,29 @@ class GCEClusterResolver(ClusterResolver): this will retrieve the IP address of all the instances within the instance group and return a ClusterResolver object suitable for use for distributed TensorFlow. + + Note: this cluster resolver cannot retrieve `task_type`, `task_id` or + `rpc_layer`. To use it with some distribution strategies like + `tf.distribute.experimental.MultiWorkerMirroredStrategy`, you will need to + specify `task_type` and `task_id` in the constructor. + + Usage example with tf.distribute.Strategy: + + ```Python + # On worker 0 + cluster_resolver = GCEClusterResolver("my-project", "us-west1", + "my-instance-group", + task_type="worker", task_id=0) + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + + # On worker 1 + cluster_resolver = GCEClusterResolver("my-project", "us-west1", + "my-instance-group", + task_type="worker", task_id=1) + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + ``` """ def __init__(self, diff --git a/tensorflow/python/distribute/cluster_resolver/kubernetes_cluster_resolver.py b/tensorflow/python/distribute/cluster_resolver/kubernetes_cluster_resolver.py index f812df0e5c7..27dda7977f6 100644 --- a/tensorflow/python/distribute/cluster_resolver/kubernetes_cluster_resolver.py +++ b/tensorflow/python/distribute/cluster_resolver/kubernetes_cluster_resolver.py @@ -39,6 +39,31 @@ class KubernetesClusterResolver(ClusterResolver): the Kubernetes namespace and label selector for pods, we will retrieve the pod IP addresses of all running pods matching the selector, and return a ClusterSpec based on that information. + + Note: it cannot retrieve `task_type`, `task_id` or `rpc_layer`. To use it + with some distribution strategies like + `tf.distribute.experimental.MultiWorkerMirroredStrategy`, you will need to + specify `task_type` and `task_id` by setting these attributes. + + Usage example with tf.distribute.Strategy: + + ```Python + # On worker 0 + cluster_resolver = KubernetesClusterResolver( + {"worker": ["job-name=worker-cluster-a", "job-name=worker-cluster-b"]}) + cluster_resolver.task_type = "worker" + cluster_resolver.task_id = 0 + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + + # On worker 1 + cluster_resolver = KubernetesClusterResolver( + {"worker": ["job-name=worker-cluster-a", "job-name=worker-cluster-b"]}) + cluster_resolver.task_type = "worker" + cluster_resolver.task_id = 1 + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=cluster_resolver) + ``` """ def __init__(self, @@ -101,6 +126,8 @@ class KubernetesClusterResolver(ClusterResolver): parameters when using this function. If you do both, the function parameters will override the object properties. + Note: this is only useful for TensorFlow 1.x. + Args: task_type: (Optional) The type of the TensorFlow task of the master. task_id: (Optional) The index of the TensorFlow task of the master. diff --git a/tensorflow/python/distribute/cluster_resolver/tfconfig_cluster_resolver.py b/tensorflow/python/distribute/cluster_resolver/tfconfig_cluster_resolver.py index 305af265b03..30063d090d1 100644 --- a/tensorflow/python/distribute/cluster_resolver/tfconfig_cluster_resolver.py +++ b/tensorflow/python/distribute/cluster_resolver/tfconfig_cluster_resolver.py @@ -55,6 +55,30 @@ class TFConfigClusterResolver(ClusterResolver): This is an implementation of cluster resolvers when using TF_CONFIG to set information about the cluster. The cluster spec returned will be initialized from the TF_CONFIG environment variable. + + An example to set TF_CONFIG is: + + ```Python + os.environ['TF_CONFIG'] = json.dumps({ + 'cluster': { + 'worker': ["localhost:12345", "localhost:23456"] + }, + 'task': {'type': 'worker', 'index': 0} + }) + ``` + + However, sometimes the container orchestration framework will set TF_CONFIG + for you. In this case, you can just create an instance without passing in any + arguments. You can find an example here to let Kuburnetes set TF_CONFIG for + you: https://github.com/tensorflow/ecosystem/tree/master/kubernetes. Then you + can use it with `tf.distribute.Strategy` as: + + ```Python + # `TFConfigClusterResolver` is already the default one in the following + # strategy. + strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy( + cluster_resolver=TFConfigClusterResolver()) + ``` """ def __init__(self, @@ -140,6 +164,8 @@ class TFConfigClusterResolver(ClusterResolver): def master(self, task_type=None, task_id=None, rpc_layer=None): """Returns the master address to use when creating a TensorFlow session. + Note: this is only useful for TensorFlow 1.x. + Args: task_type: (String, optional) Overrides and sets the task_type of the master.