From 61c2e69663ee78de30431dc129d059d29d6d80ea Mon Sep 17 00:00:00 2001 From: Rick Chao Date: Mon, 22 Jun 2020 14:33:38 -0700 Subject: [PATCH] Make task_type and task_id standard properties in tf.distribute cluster resolvers. PiperOrigin-RevId: 317736970 Change-Id: Ia9c76462afc4c2fcc42a149960b50b2cbcafd482 --- .../cluster_resolver/cluster_resolver.py | 121 +++++++++++++++++- .../gce_cluster_resolver_test.py | 32 +++++ ...e.cluster_resolver.-cluster-resolver.pbtxt | 8 ++ ...esolver.-kubernetes-cluster-resolver.pbtxt | 8 ++ ...ter_resolver.-slurm-cluster-resolver.pbtxt | 8 ++ ...ter_resolver.-t-p-u-cluster-resolver.pbtxt | 8 ++ ...e.cluster_resolver.-cluster-resolver.pbtxt | 8 ++ ...esolver.-kubernetes-cluster-resolver.pbtxt | 8 ++ ...ter_resolver.-slurm-cluster-resolver.pbtxt | 8 ++ ...ter_resolver.-t-p-u-cluster-resolver.pbtxt | 8 ++ 10 files changed, 212 insertions(+), 5 deletions(-) diff --git a/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py b/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py index a8babc21af6..f43bfc9845f 100644 --- a/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py +++ b/tensorflow/python/distribute/cluster_resolver/cluster_resolver.py @@ -71,11 +71,22 @@ class ClusterResolver(object): workers. This will eventually allow us to automatically recover from underlying machine failures and scale TensorFlow worker clusters up and down. - Note to Implementors: In addition to these abstract methods, you must also - implement the task_type, task_id, and rpc_layer attributes. You may choose - to implement them either as properties with getters or setters or directly - set the attributes. The task_type and task_id attributes are required by - `tf.distribute.experimental.MultiWorkerMirroredStrategy`. + Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver` + subclass: In addition to these abstract methods, when task_type, task_id, and + rpc_layer attributes are applicable, you should also implement them either as + properties with getters or setters, or directly set the attributes + `self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class' + getters and setters are used. See + `tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an + example. + + In general, multi-client tf.distribute strategies such as + `tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and + task_id properties to be available in the `ClusterResolver` they are using. On + the other hand, these concepts are not applicable in single-client strategies, + such as `tf.distribute.experimental.TPUStrategy`, because the program is only + expected to be run on one task, so there should not be a need to have code + branches according to task type and task id. - task_type is the name of the server's current named job (e.g. 'worker', 'ps' in a distributed parameterized training job). @@ -177,6 +188,106 @@ class ClusterResolver(object): """ return '' + @property + def task_type(self): + """Returns the task type this `ClusterResolver` indicates. + + In TensorFlow distributed environment, each job may have an applicable + task type. Valid task types in TensorFlow include + 'chief': a worker that is designated with more responsibility, + 'worker': a regular worker for training/evaluation, + 'ps': a parameter server, or + 'evaluator': an evaluator that evaluates the checkpoints for metrics. + + See [Multi-worker configuration]( + https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration) + for more information about 'chief' and 'worker' task type, which are most + commonly used. + + Having access to such information is useful when user needs to run specific + code according to task types. For example, + + ```python + cluster_spec = tf.train.ClusterSpec({ + "ps": ["localhost:2222", "localhost:2223"], + "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] + }) + + # SimpleClusterResolver is used here for illustration; other cluster + # resolvers may be used for other source of task type/id. + simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", + task_id=1) + + ... + + if cluster_resolver.task_type == 'worker': + # Perform something that's only applicable on workers. This block + # will run on this particular instance since we've specified this task to + # be a worker in above cluster resolver. + elif cluster_resolver.task_type == 'ps': + # Perform something that's only applicable on parameter servers. This + # block will not run on this particular instance. + ``` + + Returns `None` if such information is not available or is not applicable + in the current distributed environment, such as training with + `tf.distribute.experimental.TPUStrategy`. + + For more information, please see + `tf.distribute.cluster_resolver.ClusterResolver`'s class doc. + """ + return getattr(self, '_task_type', None) + + @property + def task_id(self): + """Returns the task id this `ClusterResolver` indicates. + + In TensorFlow distributed environment, each job may have an applicable + task id, which is the index of the instance within its task type. This is + useful when user needs to run specific code according to task index. For + example, + + ```python + cluster_spec = tf.train.ClusterSpec({ + "ps": ["localhost:2222", "localhost:2223"], + "worker": ["localhost:2224", "localhost:2225", "localhost:2226"] + }) + + # SimpleClusterResolver is used here for illustration; other cluster + # resolvers may be used for other source of task type/id. + simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker", + task_id=0) + + ... + + if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0: + # Perform something that's only applicable on 'worker' type, id 0. This + # block will run on this particular instance since we've specified this + # task to be a 'worker', id 0 in above cluster resolver. + else: + # Perform something that's only applicable on other ids. This block will + # not run on this particular instance. + ``` + + Returns `None` if such information is not available or is not applicable + in the current distributed environment, such as training with + `tf.distribute.cluster_resolver.TPUClusterResolver`. + + For more information, please see + `tf.distribute.cluster_resolver.ClusterResolver`'s class docstring. + """ + return getattr(self, '_task_id', None) + + @task_type.setter + def task_type(self, task_type): + """Setter of `task_type` property. See `task_type` property doc.""" + self._task_type = task_type + + @task_id.setter + def task_id(self, task_id): + """Setter of `task_id` property. See `task_type` property doc.""" + self._task_id = task_id + @tf_export('distribute.cluster_resolver.SimpleClusterResolver') class SimpleClusterResolver(ClusterResolver): diff --git a/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver_test.py b/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver_test.py index f39c86a0495..d8037497cb9 100644 --- a/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver_test.py +++ b/tensorflow/python/distribute/cluster_resolver/gce_cluster_resolver_test.py @@ -310,5 +310,37 @@ class GCEClusterResolverTest(test.TestCase): """ self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto) + def testSettingTaskTypeRaiseError(self): + name_to_ip = [ + { + 'name': 'instance1', + 'ip': '10.1.2.3' + }, + { + 'name': 'instance2', + 'ip': '10.2.3.4' + }, + { + 'name': 'instance3', + 'ip': '10.3.4.5' + }, + ] + + gce_cluster_resolver = GCEClusterResolver( + project='test-project', + zone='us-east1-d', + instance_group='test-instance-group', + task_type='testworker', + port=8470, + credentials=None, + service=self.gen_standard_mock_service_client(name_to_ip)) + + with self.assertRaisesRegexp( + RuntimeError, 'You cannot reset the task_type ' + 'of the GCEClusterResolver after it has ' + 'been created.'): + gce_cluster_resolver.task_type = 'foobar' + + if __name__ == '__main__': test.main() diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt index 0b35b61b4c0..c9dbca7368e 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt @@ -6,6 +6,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" } diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt index 3220d68e054..2819ca85612 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'job_to_label_mapping\', \'tf_server_port\', \'rpc_layer\', \'override_client\'], varargs=None, keywords=None, defaults=[\'None\', \'8470\', \'grpc\', \'None\'], " diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt index 30cfac0830c..a2c63243c86 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'jobs\', \'port_base\', \'gpus_per_node\', \'gpus_per_task\', \'tasks_per_node\', \'auto_set_gpu\', \'rpc_layer\'], varargs=None, keywords=None, defaults=[\'None\', \'8888\', \'None\', \'None\', \'None\', \'True\', \'grpc\'], " diff --git a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt index 8eee489df93..165f8d43ace 100644 --- a/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v1/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'tpu\', \'zone\', \'project\', \'job_name\', \'coordinator_name\', \'coordinator_address\', \'credentials\', \'service\', \'discovery_url\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'None\', \'worker\', \'None\', \'None\', \'default\', \'None\', \'None\'], " diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt index 0b35b61b4c0..c9dbca7368e 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-cluster-resolver.pbtxt @@ -6,6 +6,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" } diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt index 3220d68e054..2819ca85612 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-kubernetes-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'job_to_label_mapping\', \'tf_server_port\', \'rpc_layer\', \'override_client\'], varargs=None, keywords=None, defaults=[\'None\', \'8470\', \'grpc\', \'None\'], " diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt index 30cfac0830c..a2c63243c86 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-slurm-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'jobs\', \'port_base\', \'gpus_per_node\', \'gpus_per_task\', \'tasks_per_node\', \'auto_set_gpu\', \'rpc_layer\'], varargs=None, keywords=None, defaults=[\'None\', \'8888\', \'None\', \'None\', \'None\', \'True\', \'grpc\'], " diff --git a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt index 8eee489df93..165f8d43ace 100644 --- a/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt +++ b/tensorflow/tools/api/golden/v2/tensorflow.distribute.cluster_resolver.-t-p-u-cluster-resolver.pbtxt @@ -7,6 +7,14 @@ tf_class { name: "environment" mtype: "" } + member { + name: "task_id" + mtype: "" + } + member { + name: "task_type" + mtype: "" + } member_method { name: "__init__" argspec: "args=[\'self\', \'tpu\', \'zone\', \'project\', \'job_name\', \'coordinator_name\', \'coordinator_address\', \'credentials\', \'service\', \'discovery_url\'], varargs=None, keywords=None, defaults=[\'None\', \'None\', \'None\', \'worker\', \'None\', \'None\', \'default\', \'None\', \'None\'], "