Improve timeline logging for distributed execution.

- Add DeviceOp (e.g. GPU stream) logging to distributed execution.
  This is on a best-effort basis.  If device tracing is Unavailable,
  continue without it.
- Add timeline logging of RecvBuf transfers over gRPC.
- Add bandwidth consumed measurement to click tile for
  RecvTensor and RecvBuf.

PiperOrigin-RevId: 220352522
This commit is contained in:
A. Unique TensorFlower 2018-11-06 14:26:51 -08:00 committed by TensorFlower Gardener
parent fb2f956281
commit 0b5dd6fc99
7 changed files with 95 additions and 18 deletions

View File

@ -3049,7 +3049,9 @@ tf_cuda_library(
],
copts = tf_copts(),
cuda_deps = if_cuda_is_configured(tf_additional_cupti_wrapper_deps() + tf_additional_device_tracer_cuda_deps()),
visibility = ["//visibility:private"],
visibility = [
"//tensorflow:internal",
],
deps = [
":core_cpu_internal",
":lib",

View File

@ -15,7 +15,7 @@ filegroup(
]),
)
load("//tensorflow:tensorflow.bzl", "tf_cc_test")
load("//tensorflow:tensorflow.bzl", "tf_cc_test", "tf_cuda_library")
load("//tensorflow:tensorflow.bzl", "tf_cuda_cc_test")
load("//tensorflow:tensorflow.bzl", "tf_cuda_cc_tests")
load("//tensorflow:tensorflow.bzl", "tf_copts")
@ -189,7 +189,7 @@ cc_library(
],
)
cc_library(
tf_cuda_library(
name = "worker",
srcs = ["worker.cc"],
hdrs = [
@ -204,6 +204,7 @@ cc_library(
":worker_interface",
":worker_session",
"//tensorflow/core:core_cpu_internal",
"//tensorflow/core:device_tracer",
"//tensorflow/core:lib_internal",
],
)

View File

@ -87,6 +87,7 @@ cc_library(
"//tensorflow/core:core_cpu_internal",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:protos_all_cc",
"//tensorflow/core:worker_proto_cc",
"//tensorflow/core/distributed_runtime:tensor_coding",
"//tensorflow/core/distributed_runtime:worker_cache_logger",

View File

@ -33,6 +33,7 @@ limitations under the License.
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/tracing.h"
#include "tensorflow/core/protobuf/transport_options.pb.h"
#include "tensorflow/core/protobuf/worker.pb.h"
namespace tensorflow {
@ -121,7 +122,44 @@ class GrpcRemoteWorker : public WorkerInterface {
void RecvBufAsync(CallOptions* call_opts, const RecvBufRequest* request,
RecvBufResponse* response, StatusCallback done) override {
IssueRequest(request, response, recvbuf_, std::move(done), call_opts);
int64 start_usec = Env::Default()->NowMicros();
// Type-specialized logging for this method.
bool logging_active = logger_->LoggingActive() || VLOG_IS_ON(2);
StatusCallback wrapper_done;
const StatusCallback* cb_to_use;
if (!logging_active) {
cb_to_use = &done; // No additional work to do, so just use done directly
} else {
wrapper_done = [this, request, response, done, start_usec](Status s) {
if (logger_->LoggingActive()) {
int64 end_usec = Env::Default()->NowMicros();
int64 step_id = request->step_id();
RecvBufRespExtra extra;
response->transport_options().UnpackTo(&extra);
int64 num_bytes = 0;
for (const auto& chunk : extra.tensor_content()) {
num_bytes += chunk.size();
}
int64 send_start_usec = start_usec;
// Prefer start time reported by the sender, if available.
if (response->send_start_micros()) {
send_start_usec = std::max(
start_usec, static_cast<int64>(response->send_start_micros()));
send_start_usec = std::min(send_start_usec, end_usec - 1);
}
const string& key = request->buf_rendezvous_key();
logger_->RecordDataTransfer(
step_id, send_start_usec, end_usec, key, request->src_device(),
request->dst_device(), num_bytes, "", "RecvBuf");
}
VLOG(2) << "done callback, req: " << request->DebugString()
<< " response " << response->DebugString();
done(s);
};
cb_to_use = &wrapper_done;
}
IssueRequest(request, response, recvbuf_, *cb_to_use, call_opts);
}
void CompleteGroupAsync(CallOptions* call_opts,

View File

@ -23,6 +23,7 @@ limitations under the License.
#include "tensorflow/core/distributed_runtime/rendezvous_mgr_interface.h"
#include "tensorflow/core/distributed_runtime/tensor_coding.h"
#include "tensorflow/core/distributed_runtime/worker_session.h"
#include "tensorflow/core/platform/device_tracer.h"
#include "tensorflow/core/platform/tracing.h"
namespace tensorflow {
@ -179,7 +180,28 @@ void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request,
request->exec_opts().record_timeline() ||
request->exec_opts().record_costs()) {
collector = new StepStatsCollector(response->mutable_step_stats());
// TODO(mrry,pbar): GPU tracing for distributed steps.
}
DeviceTracer* tracer = nullptr;
if (collector && request->exec_opts().record_timeline()) {
// If timeline was requested, assume we want hardware level tracing.
std::unique_ptr<DeviceTracer> trptr = CreateDeviceTracer();
if (trptr) {
tracer = trptr.release();
Status s = tracer->Start();
if (!s.ok()) {
delete tracer;
if (errors::IsUnavailable(s)) {
LOG(WARNING)
<< "Hardware tracing unavailable, continuing without it. " << s;
tracer = nullptr;
} else {
delete collector;
delete out;
done(s);
return;
}
}
}
}
CancellationManager* cm = new CancellationManager;
opts->SetCancelCallback([this, cm, step_id]() {
@ -194,6 +216,7 @@ void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request,
opts->ClearCancelCallback();
delete cm;
delete collector;
delete tracer;
delete out;
done(errors::Aborted("Call was aborted"));
return;
@ -201,8 +224,8 @@ void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request,
session->graph_mgr->ExecuteAsync(
request->graph_handle(), step_id, session.get(), request->exec_opts(),
collector, response, cm, in,
[this, step_id, response, session, cm, out, token, collector, opts,
done](Status s) {
[this, step_id, response, session, cm, out, token, collector, tracer,
opts, done](Status s) {
if (s.ok()) {
s = session->graph_mgr->RecvOutputs(step_id, out);
}
@ -210,6 +233,15 @@ void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request,
cancellation_manager_.DeregisterCallback(token);
delete cm;
if (tracer) {
Status tracer_status = tracer->Stop();
if (tracer_status.ok()) {
tracer_status = tracer->Collect(collector);
}
if (!tracer_status.ok()) {
LOG(ERROR) << "Bad status from tracer: " << tracer_status;
}
}
if (s.ok()) {
for (const auto& p : *out) {
const string& key = p.first;
@ -219,6 +251,7 @@ void Worker::DoRunGraph(CallOptions* opts, RunGraphRequestWrapper* request,
}
if (collector) collector->Finalize();
delete collector;
delete tracer;
delete out;
done(s);
});

View File

@ -101,13 +101,18 @@ void WorkerCacheLogger::RecordDataTransfer(int64 step_id, int64 start_usecs,
const string& transfer_method_name) {
NodeExecStats* ns = new NodeExecStats;
ns->set_node_name(transfer_method_name);
int64 elapsed_usecs = end_usecs - start_usecs;
if (details.empty()) {
auto byte_string = strings::StrCat("[", bytes, "B] ");
if (bytes >= 0.1 * 1048576.0) {
byte_string = strings::Printf("[%.1fMB] ", bytes / 1048576.0);
}
auto label = strings::StrCat(byte_string, tensor_name, " from ", src_device,
" to ", dst_device);
float mbs_rate = (8.0 * static_cast<float>(bytes)) / elapsed_usecs;
auto rate_string = (mbs_rate >= 1000.0)
? strings::Printf("[%.1fGb/s] ", mbs_rate / 1000.0)
: strings::Printf("[%fMb/s] ", mbs_rate);
auto label = strings::StrCat(byte_string, rate_string, tensor_name,
" from ", src_device, " to ", dst_device);
ns->set_timeline_label(label);
} else {
ns->set_timeline_label(details);
@ -115,13 +120,10 @@ void WorkerCacheLogger::RecordDataTransfer(int64 step_id, int64 start_usecs,
ns->set_all_start_micros(start_usecs);
ns->set_op_start_rel_micros(0);
int64 elapsed = end_usecs - start_usecs;
ns->set_op_end_rel_micros(elapsed);
ns->set_all_end_rel_micros(elapsed);
ns->set_op_end_rel_micros(elapsed_usecs);
ns->set_all_end_rel_micros(elapsed_usecs);
NodeOutput* no = ns->add_output();
no->set_slot(0);
// TODO(tucker): Maybe set the dimensions too, but then they'll
// need to be passed in.
no->mutable_tensor_description()
->mutable_allocation_description()
->set_requested_bytes(bytes);

View File

@ -1398,10 +1398,10 @@ class TensorArrayTest(test.TestCase):
for d in dev_stats:
if "/task:1/" in d:
self.assertTrue(
[s for s in dev_stats[d] if "/TensorArray" in s.node_name])
[s for s in dev_stats[d] if "TensorArray" == s.node_name])
else:
self.assertFalse(
[s for s in dev_stats[d] if "/TensorArray" in s.node_name])
[s for s in dev_stats[d] if "TensorArray" == s.node_name])
def testTensorArrayDisabledColocateWithFirstWriteCall(self):
with ops.device("/job:worker/task:0/cpu:0"):
@ -1428,10 +1428,10 @@ class TensorArrayTest(test.TestCase):
for d in dev_stats:
if "/task:0/" in d and "CPU" in d: # Skip any GPU node stats
self.assertTrue(
[s for s in dev_stats[d] if "/TensorArray" in s.node_name])
[s for s in dev_stats[d] if "TensorArray" == s.node_name])
else:
self.assertFalse(
[s for s in dev_stats[d] if "/TensorArray" in s.node_name])
[s for s in dev_stats[d] if "TensorArray" == s.node_name])
@test_util.run_in_graph_and_eager_modes
def testTensorArrayIdentity(self):