tfdbg core: add core metadata to debugger data stream + better support of concurrent debugged runs

* Let the debugger send/dump an Event proto holding a JSON string in its log_message.message field. The JSON metadata includes,
1) An optional, client-specified global_step field that defaults to -1 if not supplied
2) A session run count
3) An executor invocation step count
4) Input names (feed keys)
5) Output names (fetched Tensor names)
6) Target node names

* grpc_debug_server.EventListenerBaseServicer now requires a constructor of the type EventListenerBaseStreamHandler and will construct a new handler object from it, for every stream. This leads to better support of concurrent debugged Session::Run() calls.

* Add support for path names in grpc:// URLs, such as "grpc://localhost:6000/thread1". Different path names will lead to separate gRPC streams being opened to the same server:port, supporting concurrent debugged Session::Run() calls.
Change: 146896481
This commit is contained in:
Shanqing Cai 2017-02-08 04:32:26 -08:00 committed by TensorFlower Gardener
parent 6bbbd7e9d2
commit aabc7972b9
14 changed files with 452 additions and 43 deletions

View File

@ -39,6 +39,24 @@ class DebuggerStateInterface {
// record. See the documentation of DebugNodeInserter::InsertNodes() for // record. See the documentation of DebugNodeInserter::InsertNodes() for
// details. // details.
virtual Status DecorateGraphForDebug(Graph* graph, Device* device) = 0; virtual Status DecorateGraphForDebug(Graph* graph, Device* device) = 0;
// Publish metadata about the debugged Session::Run() call.
//
// Args:
// global_step: A global step count supplied by the caller of
// Session::Run().
// session_run_count: A counter for calls to the Run() method of the
// Session object.
// executor_step_count: A counter for invocations of the executor charged
// to serve this Session::Run() call.
// input_names: Name of the input Tensors (feed keys).
// output_names: Names of the fetched Tensors.
// target_names: Names of the target nodes.
virtual Status PublishDebugMetadata(
const int64 global_step, const int64 session_run_count,
const int64 executor_step_count, const std::vector<string>& input_names,
const std::vector<string>& output_names,
const std::vector<string>& target_nodes) = 0;
}; };
typedef std::function<std::unique_ptr<DebuggerStateInterface>( typedef std::function<std::unique_ptr<DebuggerStateInterface>(

View File

@ -397,6 +397,9 @@ Status DirectSession::Run(const RunOptions& run_options,
ExecutorsAndKeys* executors_and_keys; ExecutorsAndKeys* executors_and_keys;
RunStateArgs run_state_args; RunStateArgs run_state_args;
Executor::Args args;
args.step_id = step_id_counter_.fetch_add(1);
// EXPERIMENTAL: Options that allow the client to insert nodes into partition // EXPERIMENTAL: Options that allow the client to insert nodes into partition
// graphs for debugging. // graphs for debugging.
if (!run_options.debug_options().debug_tensor_watch_opts().empty()) { if (!run_options.debug_options().debug_tensor_watch_opts().empty()) {
@ -407,10 +410,15 @@ Status DirectSession::Run(const RunOptions& run_options,
TF_RETURN_IF_ERROR( TF_RETURN_IF_ERROR(
GetOrCreateExecutors(pool, input_tensor_names, output_names, target_nodes, GetOrCreateExecutors(pool, input_tensor_names, output_names, target_nodes,
&executors_and_keys, &run_state_args)); &executors_and_keys, &run_state_args));
const int64 executor_step_count = executors_and_keys->step_count.fetch_add(1);
if (run_state_args.debugger_state) {
TF_RETURN_IF_ERROR(run_state_args.debugger_state->PublishDebugMetadata(
run_options.debug_options().global_step(), args.step_id,
executor_step_count, input_tensor_names, output_names, target_nodes));
}
// Create a run state and start execution. // Create a run state and start execution.
Executor::Args args;
args.step_id = step_id_counter_.fetch_add(1);
RunState run_state(args.step_id, &devices_); RunState run_state(args.step_id, &devices_);
run_state.rendez = new IntraProcessRendezvous(device_mgr_.get()); run_state.rendez = new IntraProcessRendezvous(device_mgr_.get());
CancellationManager step_cancellation_manager; CancellationManager step_cancellation_manager;
@ -450,8 +458,7 @@ Status DirectSession::Run(const RunOptions& run_options,
options_.config.graph_options().build_cost_model(); options_.config.graph_options().build_cost_model();
const int64 build_cost_model_after = const int64 build_cost_model_after =
options_.config.graph_options().build_cost_model_after(); options_.config.graph_options().build_cost_model_after();
int measure_step_count = int measure_step_count = executor_step_count - build_cost_model_after;
executors_and_keys->step_count - build_cost_model_after;
if (measure_step_count >= 0) { if (measure_step_count >= 0) {
update_cost_model = update_cost_model =
((measure_step_count + 1) % build_cost_model_every == 0); ((measure_step_count + 1) % build_cost_model_every == 0);
@ -527,7 +534,6 @@ Status DirectSession::Run(const RunOptions& run_options,
// Build and return the cost model as instructed. // Build and return the cost model as instructed.
mutex_lock l(executor_lock_); mutex_lock l(executor_lock_);
++executors_and_keys->step_count;
if (update_cost_model) { if (update_cost_model) {
// Build the cost model // Build the cost model
std::unordered_map<string, const Graph*> device_to_graph; std::unordered_map<string, const Graph*> device_to_graph;

View File

@ -125,7 +125,9 @@ class DirectSession : public Session {
// library. Consider giving each partition its own function library to enable // library. Consider giving each partition its own function library to enable
// per-partition rewrites. // per-partition rewrites.
struct ExecutorsAndKeys { struct ExecutorsAndKeys {
int64 step_count = 0; ExecutorsAndKeys() : step_count(0) {}
std::atomic_int_fast64_t step_count;
std::unique_ptr<Graph> graph; std::unique_ptr<Graph> graph;
NameNodeMap name_to_node; NameNodeMap name_to_node;
std::unique_ptr<FunctionLibraryDefinition> flib_def; std::unique_ptr<FunctionLibraryDefinition> flib_def;

View File

@ -75,6 +75,16 @@ Status DebuggerState::DecorateGraphForDebug(Graph* graph, Device* device) {
return status; return status;
} }
Status DebuggerState::PublishDebugMetadata(
const int64 global_step, const int64 session_run_count,
const int64 executor_step_count, const std::vector<string>& input_names,
const std::vector<string>& output_names,
const std::vector<string>& target_nodes) {
return DebugIO::PublishDebugMetadata(global_step, session_run_count,
executor_step_count, input_names,
output_names, target_nodes, debug_urls_);
}
// static // static
Status DebugNodeInserter::InsertNodes( Status DebugNodeInserter::InsertNodes(
const protobuf::RepeatedPtrField<DebugTensorWatch>& watches, Graph* graph, const protobuf::RepeatedPtrField<DebugTensorWatch>& watches, Graph* graph,

View File

@ -43,6 +43,17 @@ class DebuggerState : public DebuggerStateInterface {
const protobuf::RepeatedPtrField<DebugTensorWatch>& watches; const protobuf::RepeatedPtrField<DebugTensorWatch>& watches;
// Publish metadata about the debugged Session::Run() call.
//
// See the doc string of DebuggerStateInterface::PublishDebugMetadata() for
// details.
Status PublishDebugMetadata(const int64 global_step,
const int64 session_run_count,
const int64 executor_step_count,
const std::vector<string>& input_names,
const std::vector<string>& output_names,
const std::vector<string>& target_names);
private: private:
std::unordered_set<string> debug_urls_; std::unordered_set<string> debug_urls_;
}; };

View File

@ -80,10 +80,10 @@ TEST_F(GrpcDebugTest, AttemptToSendToNonexistentGrpcAddress) {
"foo_tensor", "DebugIdentity", tensor, Env::Default()->NowMicros(), "foo_tensor", "DebugIdentity", tensor, Env::Default()->NowMicros(),
{kInvalidGrpcUrl}); {kInvalidGrpcUrl});
ASSERT_FALSE(publish_status.ok()); ASSERT_FALSE(publish_status.ok());
ASSERT_NE( ASSERT_NE(string::npos,
string::npos, publish_status.error_message().find(
publish_status.error_message().find( "Channel at the following gRPC stream URL is not ready: "
"Channel at the following gRPC address is not ready: 0.0.0.0:0")); "grpc://0.0.0.0:0"));
DebugIO::CloseDebugURL(kInvalidGrpcUrl); DebugIO::CloseDebugURL(kInvalidGrpcUrl);
} }

View File

@ -98,6 +98,83 @@ const char* const DebugIO::kFileURLScheme = "file://";
// static // static
const char* const DebugIO::kGrpcURLScheme = "grpc://"; const char* const DebugIO::kGrpcURLScheme = "grpc://";
// static
Status DebugIO::PublishDebugMetadata(
const int64 global_step, const int64 session_run_count,
const int64 executor_step_count, const std::vector<string>& input_names,
const std::vector<string>& output_names,
const std::vector<string>& target_nodes,
const std::unordered_set<string>& debug_urls) {
std::ostringstream oss;
// Construct a JSON string to carry the metadata.
oss << "{";
oss << "\"global_step\":" << global_step << ",";
oss << "\"session_run_count\":" << session_run_count << ",";
oss << "\"executor_step_count\":" << executor_step_count << ",";
oss << "\"input_names\":[";
for (size_t i = 0; i < input_names.size(); ++i) {
oss << "\"" << input_names[i] << "\"";
if (i < input_names.size() - 1) {
oss << ",";
}
}
oss << "],";
oss << "\"output_names\":[";
for (size_t i = 0; i < output_names.size(); ++i) {
oss << "\"" << output_names[i] << "\"";
if (i < output_names.size() - 1) {
oss << ",";
}
}
oss << "],";
oss << "\"target_nodes\":[";
for (size_t i = 0; i < target_nodes.size(); ++i) {
oss << "\"" << target_nodes[i] << "\"";
if (i < target_nodes.size() - 1) {
oss << ",";
}
}
oss << "]";
oss << "}";
const string json_metadata = oss.str();
Event event;
event.set_wall_time(static_cast<double>(Env::Default()->NowMicros()));
LogMessage* log_message = event.mutable_log_message();
log_message->set_message(json_metadata);
Status status;
for (const string& url : debug_urls) {
if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
Event grpc_event;
// Determine the path (if any) in the grpc:// URL, and add it as a field
// of the JSON string.
const string address = url.substr(strlen(DebugIO::kFileURLScheme));
const string path = address.find("/") == string::npos
? ""
: address.substr(address.find("/"));
grpc_event.set_wall_time(event.wall_time());
LogMessage* log_message_grpc = grpc_event.mutable_log_message();
log_message_grpc->set_message(
strings::StrCat(json_metadata.substr(0, json_metadata.size() - 1),
",\"grpc_path\":\"", path, "\"}"));
status.Update(
DebugGrpcIO::SendEventProtoThroughGrpcStream(grpc_event, url));
} else if (str_util::Lowercase(url).find(kFileURLScheme) == 0) {
const string dump_root_dir = url.substr(strlen(kFileURLScheme));
const string file_name =
strings::StrCat("_tfdbg_core_metadata_", Env::Default()->NowMicros());
status.Update(
DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
}
}
return status;
}
// static // static
Status DebugIO::PublishDebugTensor(const string& tensor_name, Status DebugIO::PublishDebugTensor(const string& tensor_name,
const string& debug_op, const Tensor& tensor, const string& debug_op, const Tensor& tensor,
@ -136,10 +213,8 @@ Status DebugIO::PublishDebugTensor(const string& tensor_name,
fail_statuses.push_back(s); fail_statuses.push_back(s);
} }
} else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) { } else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
const string grpc_server_stream_addr = url.substr(strlen(kGrpcURLScheme));
Status s = DebugGrpcIO::SendTensorThroughGrpcStream( Status s = DebugGrpcIO::SendTensorThroughGrpcStream(
node_name, output_slot, debug_op, tensor, wall_time_us, node_name, output_slot, debug_op, tensor, wall_time_us, url);
grpc_server_stream_addr);
if (!s.ok()) { if (!s.ok()) {
num_failed_urls++; num_failed_urls++;
@ -189,8 +264,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
status.Update( status.Update(
DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
} else if (debug_url.find(kGrpcURLScheme) == 0) { } else if (debug_url.find(kGrpcURLScheme) == 0) {
DebugGrpcIO::SendEventProtoThroughGrpcStream( DebugGrpcIO::SendEventProtoThroughGrpcStream(event, debug_url);
event, debug_url.substr(strlen(kGrpcURLScheme)));
} }
} }
@ -200,8 +274,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
// static // static
Status DebugIO::CloseDebugURL(const string& debug_url) { Status DebugIO::CloseDebugURL(const string& debug_url) {
if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) { if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) {
return DebugGrpcIO::CloseGrpcStream( return DebugGrpcIO::CloseGrpcStream(debug_url);
debug_url.substr(strlen(DebugIO::kGrpcURLScheme)));
} else { } else {
// No-op for non-gRPC URLs. // No-op for non-gRPC URLs.
return Status::OK(); return Status::OK();
@ -348,57 +421,64 @@ std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>
DebugGrpcIO::stream_channels; DebugGrpcIO::stream_channels;
// static // static
Status DebugGrpcIO::SendTensorThroughGrpcStream( Status DebugGrpcIO::SendTensorThroughGrpcStream(const string& node_name,
const string& node_name, const int32 output_slot, const string& debug_op, const int32 output_slot,
const Tensor& tensor, const uint64 wall_time_us, const string& debug_op,
const string& server_stream_addr) { const Tensor& tensor,
const uint64 wall_time_us,
const string& grpc_stream_url) {
const string tensor_name = strings::StrCat(node_name, ":", output_slot); const string tensor_name = strings::StrCat(node_name, ":", output_slot);
// Prepare tensor Event data to be sent. // Prepare tensor Event data to be sent.
Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us); Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us);
return SendEventProtoThroughGrpcStream(event, server_stream_addr); return SendEventProtoThroughGrpcStream(event, grpc_stream_url);
} }
// static // static
Status DebugGrpcIO::SendEventProtoThroughGrpcStream( Status DebugGrpcIO::SendEventProtoThroughGrpcStream(
const Event& event_proto, const string& server_stream_addr) { const Event& event_proto, const string& grpc_stream_url) {
const string addr_with_path =
grpc_stream_url.substr(strlen(DebugIO::kFileURLScheme));
const string server_stream_addr =
addr_with_path.substr(0, addr_with_path.find('/'));
std::shared_ptr<DebugGrpcChannel> debug_grpc_channel; std::shared_ptr<DebugGrpcChannel> debug_grpc_channel;
{ {
mutex_lock l(streams_mu); mutex_lock l(streams_mu);
if (stream_channels.find(server_stream_addr) == stream_channels.end()) { if (stream_channels.find(grpc_stream_url) == stream_channels.end()) {
debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr)); debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr));
if (!debug_grpc_channel->is_channel_ready()) { if (!debug_grpc_channel->is_channel_ready()) {
return errors::FailedPrecondition( return errors::FailedPrecondition(
strings::StrCat("Channel at the following gRPC address is ", strings::StrCat("Channel at the following gRPC stream URL is ",
"not ready: ", server_stream_addr)); "not ready: ", grpc_stream_url));
} }
stream_channels[server_stream_addr] = debug_grpc_channel; stream_channels[grpc_stream_url] = debug_grpc_channel;
} else { } else {
debug_grpc_channel = stream_channels[server_stream_addr]; debug_grpc_channel = stream_channels[grpc_stream_url];
} }
} }
bool write_ok = debug_grpc_channel->WriteEvent(event_proto); bool write_ok = debug_grpc_channel->WriteEvent(event_proto);
if (!write_ok) { if (!write_ok) {
return errors::Cancelled(strings::StrCat("Write event to stream URL ", return errors::Cancelled(strings::StrCat("Write event to stream URL ",
server_stream_addr, "failed.")); grpc_stream_url, "failed."));
} }
return Status::OK(); return Status::OK();
} }
Status DebugGrpcIO::CloseGrpcStream(const string& server_stream_addr) { Status DebugGrpcIO::CloseGrpcStream(const string& grpc_stream_url) {
mutex_lock l(streams_mu); mutex_lock l(streams_mu);
if (stream_channels.find(server_stream_addr) != stream_channels.end()) { if (stream_channels.find(grpc_stream_url) != stream_channels.end()) {
// Stream of the specified address exists. Close it and remove it from // Stream of the specified address exists. Close it and remove it from
// record. // record.
Status s; Status s;
s = stream_channels[server_stream_addr]->Close(); s = stream_channels[grpc_stream_url]->Close();
stream_channels.erase(server_stream_addr); stream_channels.erase(grpc_stream_url);
return s; return s;
} else { } else {
// Stream of the specified address does not exist. No action. // Stream of the specified address does not exist. No action.

View File

@ -32,6 +32,13 @@ Status ReadEventFromFile(const string& dump_file_path, Event* event);
class DebugIO { class DebugIO {
public: public:
static Status PublishDebugMetadata(
const int64 global_step, const int64 session_run_count,
const int64 executor_step_count, const std::vector<string>& input_names,
const std::vector<string>& output_names,
const std::vector<string>& target_nodes,
const std::unordered_set<string>& debug_urls);
// Publish a tensor to a debug target URL. // Publish a tensor to a debug target URL.
// //
// Args: // Args:
@ -59,7 +66,6 @@ class DebugIO {
static Status CloseDebugURL(const string& debug_url); static Status CloseDebugURL(const string& debug_url);
private:
static const char* const kFileURLScheme; static const char* const kFileURLScheme;
static const char* const kGrpcURLScheme; static const char* const kGrpcURLScheme;
}; };
@ -172,18 +178,18 @@ class DebugGrpcIO {
const string& debug_op, const string& debug_op,
const Tensor& tensor, const Tensor& tensor,
const uint64 wall_time_us, const uint64 wall_time_us,
const string& server_stream_addr); const string& grpc_stream_url);
// Send an Event proto through a debug gRPC stream. // Send an Event proto through a debug gRPC stream.
// Thread-safety: Safe with respect to other calls to the same method and // Thread-safety: Safe with respect to other calls to the same method and
// calls to CloseGrpcStream(). // calls to CloseGrpcStream().
static Status SendEventProtoThroughGrpcStream( static Status SendEventProtoThroughGrpcStream(const Event& event_proto,
const Event& event_proto, const string& server_stream_addr); const string& grpc_stream_url);
// Close a gRPC stream to the given address, if it exists. // Close a gRPC stream to the given address, if it exists.
// Thread-safety: Safe with respect to other calls to the same method and // Thread-safety: Safe with respect to other calls to the same method and
// calls to SendTensorThroughGrpcStream(). // calls to SendTensorThroughGrpcStream().
static Status CloseGrpcStream(const string& server_stream_addr); static Status CloseGrpcStream(const string& grpc_stream_url);
private: private:
static mutex streams_mu; static mutex streams_mu;

View File

@ -27,6 +27,13 @@ message DebugTensorWatch {
// E.g., "file:///foo/tfdbg_dump", "grpc://localhost:11011" // E.g., "file:///foo/tfdbg_dump", "grpc://localhost:11011"
// Each debug op listed in debug_ops will publish its output tensor (debug // Each debug op listed in debug_ops will publish its output tensor (debug
// signal) to all URLs in debug_urls. // signal) to all URLs in debug_urls.
//
// N.B. Session::Run() supports concurrent invocations of the same inputs
// (feed keys), outputs and target nodes. If such concurrent invocations
// are to be debugged, the callers of Session::Run() must use distinct
// debug_urls to make sure that the streamed or dumped events do not overlap
// among the invocations.
// TODO(cais): More visible documentation of this in g3docs.
repeated string debug_urls = 4; repeated string debug_urls = 4;
} }
@ -34,4 +41,9 @@ message DebugTensorWatch {
message DebugOptions { message DebugOptions {
// Debugging options // Debugging options
repeated DebugTensorWatch debug_tensor_watch_opts = 4; repeated DebugTensorWatch debug_tensor_watch_opts = 4;
// Caller-specified global step count.
// Note that this is distinct from the session run count and the executor
// step count.
int64 global_step = 10;
} }

View File

@ -436,6 +436,7 @@ cuda_py_test(
"//tensorflow/python:platform_test", "//tensorflow/python:platform_test",
"//tensorflow/python:variables", "//tensorflow/python:variables",
], ],
tags = ["notsan"],
) )
py_test( py_test(

View File

@ -19,6 +19,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import collections import collections
import json
import os import os
import numpy as np import numpy as np
@ -31,6 +32,7 @@ from tensorflow.python.platform import gfile
METADATA_FILE_PREFIX = "_tfdbg_" METADATA_FILE_PREFIX = "_tfdbg_"
CORE_METADATA_TAG = "core_metadata_"
GRAPH_FILE_TAG = "graph_" GRAPH_FILE_TAG = "graph_"
FETCHES_INFO_FILE_TAG = "fetches_info_" FETCHES_INFO_FILE_TAG = "fetches_info_"
FEED_KEYS_INFO_FILE_TAG = "feed_keys_info_" FEED_KEYS_INFO_FILE_TAG = "feed_keys_info_"
@ -110,6 +112,10 @@ def parse_node_or_tensor_name(name):
return name, None return name, None
def _is_core_metadata_file(file_name):
return file_name.startswith(METADATA_FILE_PREFIX + CORE_METADATA_TAG)
def _is_graph_file(file_name): def _is_graph_file(file_name):
return file_name.startswith(METADATA_FILE_PREFIX + GRAPH_FILE_TAG) return file_name.startswith(METADATA_FILE_PREFIX + GRAPH_FILE_TAG)
@ -274,6 +280,20 @@ def has_inf_or_nan(datum, tensor):
return False return False
def extract_core_metadata_from_event_proto(event):
json_metadata = json.loads(event.log_message.message)
core_metadata = collections.namedtuple("CoreMetadata", [
"global_step", "session_run_count", "executor_step_count", "input_names",
"output_names", "target_nodes"
])
return core_metadata(json_metadata["global_step"],
json_metadata["session_run_count"],
json_metadata["executor_step_count"],
json_metadata["input_names"],
json_metadata["output_names"],
json_metadata["target_nodes"])
class DebugTensorDatum(object): class DebugTensorDatum(object):
"""A single tensor dumped by TensorFlow Debugger (tfdbg). """A single tensor dumped by TensorFlow Debugger (tfdbg).
@ -455,6 +475,7 @@ class DebugDumpDir(object):
if not gfile.IsDirectory(dump_root): if not gfile.IsDirectory(dump_root):
raise IOError("Dump root directory %s does not exist" % dump_root) raise IOError("Dump root directory %s does not exist" % dump_root)
self._core_metadata = None
self._load_dumps(dump_root) self._load_dumps(dump_root)
self._create_tensor_watch_maps() self._create_tensor_watch_maps()
self._load_partition_graphs(partition_graphs, validate) self._load_partition_graphs(partition_graphs, validate)
@ -498,6 +519,9 @@ class DebugDumpDir(object):
for root, _, files in gfile.Walk(self._dump_root): for root, _, files in gfile.Walk(self._dump_root):
for f in files: for f in files:
if f.startswith(METADATA_FILE_PREFIX): if f.startswith(METADATA_FILE_PREFIX):
if _is_core_metadata_file(f):
self._load_core_metadata(os.path.join(self._dump_root, root, f))
if _is_graph_file(f): if _is_graph_file(f):
self._dump_graph_file_paths.append( self._dump_graph_file_paths.append(
os.path.join(self._dump_root, root, f)) os.path.join(self._dump_root, root, f))
@ -526,6 +550,12 @@ class DebugDumpDir(object):
else: else:
self._t0 = None self._t0 = None
def _load_core_metadata(self, event_file_path):
event = event_pb2.Event()
with gfile.Open(event_file_path, "rb") as f:
event.ParseFromString(f.read())
self._core_metadata = extract_core_metadata_from_event_proto(event)
def _dump_file_name_to_datum(self, dir_name, file_name): def _dump_file_name_to_datum(self, dir_name, file_name):
"""Obtain a DebugTensorDatum from the directory and file name. """Obtain a DebugTensorDatum from the directory and file name.
@ -587,6 +617,37 @@ class DebugDumpDir(object):
for op in self._python_graph.get_operations(): for op in self._python_graph.get_operations():
self._node_traceback[op.name] = op.traceback self._node_traceback[op.name] = op.traceback
@property
def core_metadata(self):
"""Metadata about the `Session.run()` call from the core runtime.
Of the three counters available in the return value, `global_step` is
supplied by the caller of the debugged `Session.run()`, while
`session_run_count` and `executor_step_count` are determined by the state
of the core runtime, automatically. For the same fetch list, feed keys and
debug tensor watch options, the same executor will be used and
`executor_step_count` should increase by one at a time. However, runs with
different fetch lists, feed keys and debug_tensor watch options that all
share the same `Session` object can lead to gaps in `session_run_count`.
Returns:
If core metadata are loaded, a `namedtuple` with the fields:
`global_step`: A global step count supplied by the caller of
`Session.run()`. It is optional to the caller. If the caller did not
supply this parameter, its value will be -1.
`session_run_count`: A counter for Run() calls to the underlying
TensorFlow `Session` object.
`executor_step_count`: A counter for invocations of a given runtime
executor. The same executor is re-used for the same fetched tensors,
target nodes, input feed keys and debug tensor watch options.
`input_names`: Names of the input (feed) Tensors.
`output_names`: Names of the output (fetched) Tensors.
`target_nodes`: Names of the target nodes.
If the core metadata have not been loaded, `None`.
"""
return self._core_metadata
@property @property
def dumped_tensor_data(self): def dumped_tensor_data(self):
return self._dump_tensor_data return self._dump_tensor_data

View File

@ -27,7 +27,8 @@ def add_debug_tensor_watch(run_options,
node_name, node_name,
output_slot=0, output_slot=0,
debug_ops="DebugIdentity", debug_ops="DebugIdentity",
debug_urls=None): debug_urls=None,
global_step=-1):
"""Add watch on a `Tensor` to `RunOptions`. """Add watch on a `Tensor` to `RunOptions`.
N.B.: Under certain circumstances, the `Tensor` may not be actually watched N.B.: Under certain circumstances, the `Tensor` may not be actually watched
@ -42,9 +43,12 @@ def add_debug_tensor_watch(run_options,
`list` of `str` with only one element. `list` of `str` with only one element.
debug_urls: (`str` or `list` of `str`) URL(s) to send debug values to, debug_urls: (`str` or `list` of `str`) URL(s) to send debug values to,
e.g., `file:///tmp/tfdbg_dump_1`, `grpc://localhost:12345`. e.g., `file:///tmp/tfdbg_dump_1`, `grpc://localhost:12345`.
global_step: (`int`) Optional global_step count for this debug tensor
watch.
""" """
watch_opts = run_options.debug_options.debug_tensor_watch_opts watch_opts = run_options.debug_options.debug_tensor_watch_opts
run_options.debug_options.global_step = global_step
watch = watch_opts.add() watch = watch_opts.add()
watch.node_name = node_name watch.node_name = node_name
@ -67,7 +71,8 @@ def watch_graph(run_options,
debug_ops="DebugIdentity", debug_ops="DebugIdentity",
debug_urls=None, debug_urls=None,
node_name_regex_whitelist=None, node_name_regex_whitelist=None,
op_type_regex_whitelist=None): op_type_regex_whitelist=None,
global_step=-1):
"""Add debug watches to `RunOptions` for a TensorFlow graph. """Add debug watches to `RunOptions` for a TensorFlow graph.
To watch all `Tensor`s on the graph, let both `node_name_regex_whitelist` To watch all `Tensor`s on the graph, let both `node_name_regex_whitelist`
@ -93,6 +98,8 @@ def watch_graph(run_options,
are set, the two filtering operations will occur in a logical `AND` are set, the two filtering operations will occur in a logical `AND`
relation. In other words, a node will be included if and only if it relation. In other words, a node will be included if and only if it
hits both whitelists. hits both whitelists.
global_step: (`int`) Optional global_step count for this debug tensor
watch.
""" """
if isinstance(debug_ops, str): if isinstance(debug_ops, str):
@ -128,7 +135,8 @@ def watch_graph(run_options,
node_name, node_name,
output_slot=slot, output_slot=slot,
debug_ops=debug_ops, debug_ops=debug_ops,
debug_urls=debug_urls) debug_urls=debug_urls,
global_step=global_step)
def watch_graph_with_blacklists(run_options, def watch_graph_with_blacklists(run_options,
@ -136,7 +144,8 @@ def watch_graph_with_blacklists(run_options,
debug_ops="DebugIdentity", debug_ops="DebugIdentity",
debug_urls=None, debug_urls=None,
node_name_regex_blacklist=None, node_name_regex_blacklist=None,
op_type_regex_blacklist=None): op_type_regex_blacklist=None,
global_step=-1):
"""Add debug tensor watches, blacklisting nodes and op types. """Add debug tensor watches, blacklisting nodes and op types.
This is similar to `watch_graph()`, but the node names and op types are This is similar to `watch_graph()`, but the node names and op types are
@ -161,6 +170,8 @@ def watch_graph_with_blacklists(run_options,
relation. In other words, a node will be excluded if it hits either of relation. In other words, a node will be excluded if it hits either of
the two blacklists; a node will be included if and only if it hits the two blacklists; a node will be included if and only if it hits
neither of the blacklists. neither of the blacklists.
global_step: (`int`) Optional global_step count for this debug tensor
watch.
""" """
if isinstance(debug_ops, str): if isinstance(debug_ops, str):
@ -196,4 +207,5 @@ def watch_graph_with_blacklists(run_options,
node_name, node_name,
output_slot=slot, output_slot=slot,
debug_ops=debug_ops, debug_ops=debug_ops,
debug_urls=debug_urls) debug_urls=debug_urls,
global_step=global_step)

View File

@ -18,6 +18,8 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import os import os
import shutil
import tempfile
from tensorflow.core.protobuf import config_pb2 from tensorflow.core.protobuf import config_pb2
from tensorflow.python.client import session from tensorflow.python.client import session
@ -25,6 +27,7 @@ from tensorflow.python.debug import debug_data
from tensorflow.python.debug import debug_utils from tensorflow.python.debug import debug_utils
from tensorflow.python.debug import session_debug_testlib from tensorflow.python.debug import session_debug_testlib
from tensorflow.python.framework import constant_op from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.ops import math_ops from tensorflow.python.ops import math_ops
from tensorflow.python.ops import variables from tensorflow.python.ops import variables
from tensorflow.python.platform import googletest from tensorflow.python.platform import googletest
@ -108,5 +111,25 @@ class SessionDebugTest(session_debug_testlib.SessionDebugTestBase):
dump.get_rel_timestamps("%s/read" % v_name, 0, dump.get_rel_timestamps("%s/read" % v_name, 0,
"DebugIdentity")[0], 0) "DebugIdentity")[0], 0)
class SessionDebugConcurrentTest(
session_debug_testlib.DebugConcurrentRunCallsTest):
def setUp(self):
self._num_concurrent_runs = 3
self._dump_roots = []
for _ in range(self._num_concurrent_runs):
self._dump_roots.append(tempfile.mkdtemp())
def tearDown(self):
ops.reset_default_graph()
for dump_root in self._dump_roots:
if os.path.isdir(dump_root):
shutil.rmtree(dump_root)
def _get_concurrent_debug_urls(self):
return [("file://%s" % dump_root) for dump_root in self._dump_roots]
if __name__ == "__main__": if __name__ == "__main__":
googletest.main() googletest.main()

View File

@ -18,14 +18,18 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import collections import collections
import functools
import glob
import os import os
import shutil import shutil
import tempfile import tempfile
import threading
import numpy as np import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin from six.moves import xrange # pylint: disable=redefined-builtin
from tensorflow.core.protobuf import config_pb2 from tensorflow.core.protobuf import config_pb2
from tensorflow.core.util import event_pb2
from tensorflow.python.client import session from tensorflow.python.client import session
from tensorflow.python.debug import debug_data from tensorflow.python.debug import debug_data
from tensorflow.python.debug import debug_utils from tensorflow.python.debug import debug_utils
@ -134,6 +138,15 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
results = self._generate_dump_from_simple_addition_graph() results = self._generate_dump_from_simple_addition_graph()
self.assertTrue(results.dump.loaded_partition_graphs()) self.assertTrue(results.dump.loaded_partition_graphs())
# Since global_step is not explicitly specified, it should take its default
# value: -1.
self.assertEqual(-1, results.dump.core_metadata.global_step)
self.assertGreaterEqual(results.dump.core_metadata.session_run_count, 0)
self.assertGreaterEqual(results.dump.core_metadata.executor_step_count, 0)
self.assertEqual([], results.dump.core_metadata.input_names)
self.assertEqual([results.w.name], results.dump.core_metadata.output_names)
self.assertEqual([], results.dump.core_metadata.target_nodes)
# Verify the dumped tensor values for u and v. # Verify the dumped tensor values for u and v.
self.assertEqual(2, results.dump.size) self.assertEqual(2, results.dump.size)
@ -869,6 +882,63 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
self.assertAllClose([0, 0, 1, 2, 2], self.assertAllClose([0, 0, 1, 2, 2],
unique_x_slot_1_dumps[0].get_tensor()) unique_x_slot_1_dumps[0].get_tensor())
def testSuccessiveDebuggingRunsIncreasesCounters(self):
"""Test repeated Session.run() calls with debugger increments counters."""
with session.Session() as sess:
ph = array_ops.placeholder(dtypes.float32, name="successive/ph")
x = array_ops.transpose(ph, name="mismatch/x")
y = array_ops.squeeze(ph, name="mismatch/y")
run_options = config_pb2.RunOptions(output_partition_graphs=True)
debug_utils.watch_graph(
run_options, sess.graph, debug_urls=self._debug_urls(), global_step=1)
sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
dump1 = debug_data.DebugDumpDir(self._dump_root)
self.assertEqual(1, dump1.core_metadata.global_step)
self.assertGreaterEqual(dump1.core_metadata.session_run_count, 0)
self.assertEqual(0, dump1.core_metadata.executor_step_count)
self.assertEqual([ph.name], dump1.core_metadata.input_names)
self.assertEqual([x.name], dump1.core_metadata.output_names)
self.assertEqual([], dump1.core_metadata.target_nodes)
shutil.rmtree(self._dump_root)
run_options = config_pb2.RunOptions(output_partition_graphs=True)
debug_utils.watch_graph(
run_options, sess.graph, debug_urls=self._debug_urls(), global_step=2)
# Calling run() with the same feed, same output and same debug watch
# options should increment both session_run_count and
# executor_step_count.
sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
dump2 = debug_data.DebugDumpDir(self._dump_root)
self.assertEqual(2, dump2.core_metadata.global_step)
self.assertEqual(dump1.core_metadata.session_run_count + 1,
dump2.core_metadata.session_run_count)
self.assertEqual(dump1.core_metadata.executor_step_count + 1,
dump2.core_metadata.executor_step_count)
self.assertEqual([ph.name], dump2.core_metadata.input_names)
self.assertEqual([x.name], dump2.core_metadata.output_names)
self.assertEqual([], dump2.core_metadata.target_nodes)
shutil.rmtree(self._dump_root)
run_options = config_pb2.RunOptions(output_partition_graphs=True)
debug_utils.watch_graph(
run_options, sess.graph, debug_urls=self._debug_urls(), global_step=3)
# Calling run() with a different output should increment
# session_run_count, but not executor_step_count.
sess.run(y, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
dump3 = debug_data.DebugDumpDir(self._dump_root)
self.assertEqual(3, dump3.core_metadata.global_step)
self.assertEqual(dump2.core_metadata.session_run_count + 1,
dump3.core_metadata.session_run_count)
self.assertEqual(0, dump3.core_metadata.executor_step_count)
self.assertEqual([ph.name], dump3.core_metadata.input_names)
self.assertEqual([y.name], dump3.core_metadata.output_names)
self.assertEqual([], dump3.core_metadata.target_nodes)
def testDebuggingDuringOpError(self): def testDebuggingDuringOpError(self):
"""Test the debug tensor dumping when error occurs in graph runtime.""" """Test the debug tensor dumping when error occurs in graph runtime."""
@ -894,6 +964,12 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
dump = debug_data.DebugDumpDir(self._dump_root) dump = debug_data.DebugDumpDir(self._dump_root)
self.assertGreaterEqual(dump.core_metadata.session_run_count, 0)
self.assertGreaterEqual(dump.core_metadata.executor_step_count, 0)
self.assertEqual([ph.name], dump.core_metadata.input_names)
self.assertEqual([y.name], dump.core_metadata.output_names)
self.assertEqual([], dump.core_metadata.target_nodes)
# Despite the fact that the run() call errored out and partition_graphs # Despite the fact that the run() call errored out and partition_graphs
# are not available via run_metadata, the partition graphs should still # are not available via run_metadata, the partition graphs should still
# have been loaded from the dump directory. # have been loaded from the dump directory.
@ -1045,5 +1121,96 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
self.assertIsInstance(trace, tuple) self.assertIsInstance(trace, tuple)
class DebugConcurrentRunCallsTest(test_util.TensorFlowTestCase):
"""Test for debugging concurrent Session.run() calls."""
def _get_concurrent_debug_urls(self):
"""Abstract method to generate debug URLs for concurrent debugged runs."""
raise NotImplementedError(
"_get_concurrent_debug_urls is not implemented in the base test class")
def testDebugConcurrentVariableUpdates(self):
if test.is_gpu_available():
self.skipTest("No testing concurrent runs on a single GPU.")
with session.Session() as sess:
v = variables.Variable(30.0, name="v")
constants = []
for i in xrange(self._num_concurrent_runs):
constants.append(constant_op.constant(1.0, name="c%d" % i))
incs = [
state_ops.assign_add(
v, c, use_locking=True, name=("inc%d" % i))
for (i, c) in enumerate(constants)
]
sess.run(v.initializer)
concurrent_debug_urls = self._get_concurrent_debug_urls()
def inc_job(index):
run_options = config_pb2.RunOptions(output_partition_graphs=True)
debug_utils.watch_graph(
run_options, sess.graph, debug_urls=concurrent_debug_urls[index])
for _ in xrange(100):
sess.run(incs[index], options=run_options)
inc_threads = []
for index in xrange(self._num_concurrent_runs):
inc_thread = threading.Thread(target=functools.partial(inc_job, index))
inc_thread.start()
inc_threads.append(inc_thread)
for inc_thread in inc_threads:
inc_thread.join()
self.assertAllClose(30.0 + 1.0 * self._num_concurrent_runs * 100,
sess.run(v))
all_session_run_counts = []
for index in xrange(self._num_concurrent_runs):
dump = debug_data.DebugDumpDir(self._dump_roots[index])
self.assertTrue(dump.loaded_partition_graphs())
v_data = dump.get_tensors("v", 0, "DebugIdentity")
self.assertEqual(100, len(v_data))
# Examine all the core metadata files
core_metadata_files = glob.glob(
os.path.join(self._dump_roots[index], "_tfdbg_core*"))
timestamps = []
session_run_counts = []
executor_step_counts = []
for core_metadata_file in core_metadata_files:
with open(core_metadata_file, "rb") as f:
event = event_pb2.Event()
event.ParseFromString(f.read())
core_metadata = (
debug_data.extract_core_metadata_from_event_proto(event))
timestamps.append(event.wall_time)
session_run_counts.append(core_metadata.session_run_count)
executor_step_counts.append(core_metadata.executor_step_count)
all_session_run_counts.extend(session_run_counts)
# Assert that executor_step_count increases by one at a time.
executor_step_counts = zip(timestamps, executor_step_counts)
executor_step_counts = sorted(executor_step_counts, key=lambda x: x[0])
for i in xrange(len(executor_step_counts) - 1):
self.assertEquals(executor_step_counts[i][1] + 1,
executor_step_counts[i + 1][1])
# Assert that session_run_count increase monotonically.
session_run_counts = zip(timestamps, session_run_counts)
session_run_counts = sorted(session_run_counts, key=lambda x: x[0])
for i in xrange(len(session_run_counts) - 1):
self.assertGreater(session_run_counts[i + 1][1],
session_run_counts[i][1])
# Assert that the session_run_counts from the concurrent run() calls are
# all unique.
self.assertEqual(len(all_session_run_counts),
len(set(all_session_run_counts)))
if __name__ == "__main__": if __name__ == "__main__":
googletest.main() googletest.main()