[tfdbg] Add C++ implementation of DebugEventsWriter

PiperOrigin-RevId: 272056762
This commit is contained in:
Shanqing Cai 2019-09-30 13:46:34 -07:00 committed by TensorFlower Gardener
parent 00979a1a95
commit 49be36bc8f
4 changed files with 1373 additions and 22 deletions

View File

@ -1830,6 +1830,7 @@ filegroup(
"lib/jpeg/**/*",
"lib/png/**/*",
"lib/gif/**/*",
"util/debug_events_writer.*",
"util/events_writer.*",
"util/stats_calculator.*",
"util/reporter.*",
@ -4096,6 +4097,7 @@ tf_cc_tests(
"graph/validate_test.cc",
"util/bcast_test.cc",
"util/command_line_flags_test.cc",
"util/debug_events_writer_test.cc",
"util/device_name_utils_test.cc",
"util/dump_graph_test.cc",
"util/equal_graph_def_test.cc",

View File

@ -0,0 +1,466 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/util/debug_events_writer.h"
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/platform/host_info.h"
#include "tensorflow/core/public/version.h"
namespace tensorflow {
namespace tfdbg {
namespace {
void MaybeSetDebugEventTimestamp(DebugEvent* debug_event, Env* env) {
if (debug_event->wall_time() == 0) {
debug_event->set_wall_time(env->NowMicros() / 1e6);
}
}
} // namespace
SingleDebugEventFileWriter::SingleDebugEventFileWriter(const string& file_path)
: env_(Env::Default()),
file_path_(file_path),
num_outstanding_events_(0),
writer_mu_() {}
Status SingleDebugEventFileWriter::Init() {
if (record_writer_ != nullptr) {
// TODO(cais): We currently don't check for file deletion. When the need
// arises, check and fix it.
return Status::OK();
}
// Reset recordio_writer (which has a reference to writable_file_) so final
// Flush() and Close() call have access to writable_file_.
record_writer_.reset();
TF_RETURN_WITH_CONTEXT_IF_ERROR(
env_->NewWritableFile(file_path_, &writable_file_),
"Creating writable file ", file_path_);
record_writer_.reset(new io::RecordWriter(writable_file_.get()));
if (record_writer_ == nullptr) {
return errors::Unknown("Could not create record writer at path: ",
file_path_);
}
num_outstanding_events_.store(0);
VLOG(1) << "Successfully opened debug events file: " << file_path_;
return Status::OK();
}
void SingleDebugEventFileWriter::WriteSerializedDebugEvent(
StringPiece debug_event_str) {
if (record_writer_ == nullptr) {
if (!Init().ok()) {
LOG(ERROR) << "Write failed because file could not be opened.";
return;
}
}
num_outstanding_events_.fetch_add(1);
{
mutex_lock l(writer_mu_);
record_writer_->WriteRecord(debug_event_str).IgnoreError();
}
}
Status SingleDebugEventFileWriter::Flush() {
const int num_outstanding = num_outstanding_events_.load();
if (num_outstanding == 0) {
return Status::OK();
}
if (writable_file_ == nullptr) {
return errors::Unknown("Unexpected NULL file for path: ", file_path_);
}
{
mutex_lock l(writer_mu_);
TF_RETURN_WITH_CONTEXT_IF_ERROR(record_writer_->Flush(), "Failed to flush ",
num_outstanding, " debug events to ",
file_path_);
}
TF_RETURN_WITH_CONTEXT_IF_ERROR(writable_file_->Sync(), "Failed to sync ",
num_outstanding, " debug events to ",
file_path_);
num_outstanding_events_.store(0);
return Status::OK();
}
Status SingleDebugEventFileWriter::Close() {
Status status = Flush();
if (writable_file_ != nullptr) {
Status close_status = writable_file_->Close();
if (!close_status.ok()) {
status = close_status;
}
record_writer_.reset(nullptr);
writable_file_.reset(nullptr);
}
num_outstanding_events_ = 0;
return status;
}
const string SingleDebugEventFileWriter::FileName() { return file_path_; }
mutex DebugEventsWriter::factory_mu_(LINKER_INITIALIZED);
DebugEventsWriter::~DebugEventsWriter() { Close().IgnoreError(); }
// static
DebugEventsWriter* DebugEventsWriter::GetDebugEventsWriter(
const string& dump_root, int64 cyclic_buffer_size) {
mutex_lock l(DebugEventsWriter::factory_mu_);
std::unordered_map<string, std::unique_ptr<DebugEventsWriter>>* writer_pool =
DebugEventsWriter::GetDebugEventsWriterMap();
if (writer_pool->find(dump_root) == writer_pool->end()) {
std::unique_ptr<DebugEventsWriter> writer(
new DebugEventsWriter(dump_root, cyclic_buffer_size));
writer_pool->insert(std::make_pair(dump_root, std::move(writer)));
}
return (*writer_pool)[dump_root].get();
}
// static
DebugEventsWriter* DebugEventsWriter::GetDebugEventsWriter(
const string& dump_root) {
return DebugEventsWriter::GetDebugEventsWriter(dump_root,
kDefaultCyclicBufferSize);
}
Status DebugEventsWriter::Init() {
mutex_lock l(initialization_mu_);
// TODO(cais): We currently don't check for file deletion. When the need
// arises, check and fix file deletion.
if (is_initialized_) {
return Status::OK();
}
if (!env_->IsDirectory(dump_root_).ok()) {
TF_RETURN_WITH_CONTEXT_IF_ERROR(env_->RecursivelyCreateDir(dump_root_),
"Failed to create directory ", dump_root_);
}
int64 time_in_seconds = env_->NowMicros() / 1e6;
file_prefix_ = io::JoinPath(
dump_root_, strings::Printf("%s.%010lld.%s", kFileNamePrefix,
static_cast<int64>(time_in_seconds),
port::Hostname().c_str()));
TF_RETURN_IF_ERROR(InitNonMetadataFile(SOURCE_FILES));
TF_RETURN_IF_ERROR(InitNonMetadataFile(STACK_FRAMES));
TF_RETURN_IF_ERROR(InitNonMetadataFile(GRAPHS));
// In case there is one left over from before.
metadata_writer_.reset();
// The metadata file should be created.
string metadata_filename = GetFileNameInternal(METADATA);
metadata_writer_.reset(new SingleDebugEventFileWriter(metadata_filename));
if (metadata_writer_ == nullptr) {
return errors::Unknown("Could not create debug event metadata file writer");
}
DebugEvent debug_event;
DebugMetadata* metadata = debug_event.mutable_debug_metadata();
metadata->set_tensorflow_version(TF_VERSION_STRING);
metadata->set_file_version(
strings::Printf("%s%d", kVersionPrefix, kCurrentFormatVersion));
SerializeAndWriteDebugEvent(&debug_event, METADATA);
TF_RETURN_WITH_CONTEXT_IF_ERROR(
metadata_writer_->Flush(), "Failed to flush debug event metadata writer");
TF_RETURN_IF_ERROR(InitNonMetadataFile(EXECUTION));
TF_RETURN_IF_ERROR(InitNonMetadataFile(GRAPH_EXECUTION_TRACES));
is_initialized_ = true;
return Status::OK();
}
void DebugEventsWriter::WriteSourceFile(SourceFile* source_file) {
DebugEvent debug_event;
debug_event.set_allocated_source_file(source_file);
SerializeAndWriteDebugEvent(&debug_event, SOURCE_FILES);
}
void DebugEventsWriter::WriteStackFrameWithId(
StackFrameWithId* stack_frame_with_id) {
DebugEvent debug_event;
debug_event.set_allocated_stack_frame_with_id(stack_frame_with_id);
SerializeAndWriteDebugEvent(&debug_event, STACK_FRAMES);
}
void DebugEventsWriter::WriteGraphOpCreation(
GraphOpCreation* graph_op_creation) {
DebugEvent debug_event;
debug_event.set_allocated_graph_op_creation(graph_op_creation);
SerializeAndWriteDebugEvent(&debug_event, GRAPHS);
}
void DebugEventsWriter::WriteDebuggedGraph(DebuggedGraph* debugged_graph) {
DebugEvent debug_event;
debug_event.set_allocated_debugged_graph(debugged_graph);
SerializeAndWriteDebugEvent(&debug_event, GRAPHS);
}
void DebugEventsWriter::WriteExecution(Execution* execution) {
if (cyclic_buffer_size_ <= 0) {
// No cyclic-buffer behavior.
DebugEvent debug_event;
debug_event.set_allocated_execution(execution);
SerializeAndWriteDebugEvent(&debug_event, EXECUTION);
} else {
// Cyclic buffer behavior.
DebugEvent debug_event;
MaybeSetDebugEventTimestamp(&debug_event, env_);
debug_event.set_allocated_execution(execution);
mutex_lock l(execution_buffer_mu_);
execution_buffer_.push_back(debug_event);
if (execution_buffer_.size() > cyclic_buffer_size_) {
execution_buffer_.pop_front();
}
}
}
void DebugEventsWriter::WriteGraphExecutionTrace(
GraphExecutionTrace* graph_execution_trace) {
if (cyclic_buffer_size_ <= 0) {
// No cyclic-buffer behavior.
DebugEvent debug_event;
debug_event.set_allocated_graph_execution_trace(graph_execution_trace);
SerializeAndWriteDebugEvent(&debug_event, GRAPH_EXECUTION_TRACES);
} else {
// Cyclic buffer behavior.
DebugEvent debug_event;
MaybeSetDebugEventTimestamp(&debug_event, env_);
debug_event.set_allocated_graph_execution_trace(graph_execution_trace);
mutex_lock l(graph_execution_trace_buffer_mu_);
graph_execution_trace_buffer_.push_back(debug_event);
if (graph_execution_trace_buffer_.size() > cyclic_buffer_size_) {
graph_execution_trace_buffer_.pop_front();
}
}
}
Status DebugEventsWriter::FlushNonExecutionFiles() {
TF_RETURN_IF_ERROR(Init());
if (source_files_writer_ != nullptr) {
TF_RETURN_IF_ERROR(source_files_writer_->Flush());
}
if (stack_frames_writer_ != nullptr) {
TF_RETURN_IF_ERROR(stack_frames_writer_->Flush());
}
if (graphs_writer_ != nullptr) {
TF_RETURN_IF_ERROR(graphs_writer_->Flush());
}
return Status::OK();
}
Status DebugEventsWriter::FlushExecutionFiles() {
TF_RETURN_IF_ERROR(Init());
if (execution_writer_ != nullptr) {
if (cyclic_buffer_size_ > 0) {
// Write out all the content in the cyclic buffers.
mutex_lock l(execution_buffer_mu_);
while (!execution_buffer_.empty()) {
SerializeAndWriteDebugEvent(&execution_buffer_.front(), EXECUTION);
execution_buffer_.pop_front();
}
}
TF_RETURN_IF_ERROR(execution_writer_->Flush());
}
if (graph_execution_traces_writer_ != nullptr) {
if (cyclic_buffer_size_ > 0) {
// Write out all the content in the cyclic buffers.
mutex_lock l(graph_execution_trace_buffer_mu_);
while (!graph_execution_trace_buffer_.empty()) {
SerializeAndWriteDebugEvent(&graph_execution_trace_buffer_.front(),
GRAPH_EXECUTION_TRACES);
graph_execution_trace_buffer_.pop_front();
}
}
TF_RETURN_IF_ERROR(graph_execution_traces_writer_->Flush());
}
return Status::OK();
}
string DebugEventsWriter::FileName(DebugEventFileType type) {
if (file_prefix_.empty()) {
Init().IgnoreError();
}
return GetFileNameInternal(type);
}
Status DebugEventsWriter::Close() {
std::vector<string> failed_to_close_files;
if (metadata_writer_ != nullptr) {
if (!metadata_writer_->Close().ok()) {
failed_to_close_files.push_back(metadata_writer_->FileName());
}
metadata_writer_.reset(nullptr);
}
TF_RETURN_IF_ERROR(FlushNonExecutionFiles());
if (source_files_writer_ != nullptr) {
if (!source_files_writer_->Close().ok()) {
failed_to_close_files.push_back(source_files_writer_->FileName());
}
source_files_writer_.reset(nullptr);
}
if (stack_frames_writer_ != nullptr) {
if (!stack_frames_writer_->Close().ok()) {
failed_to_close_files.push_back(stack_frames_writer_->FileName());
}
stack_frames_writer_.reset(nullptr);
}
if (graphs_writer_ != nullptr) {
if (!graphs_writer_->Close().ok()) {
failed_to_close_files.push_back(graphs_writer_->FileName());
}
graphs_writer_.reset(nullptr);
}
TF_RETURN_IF_ERROR(FlushExecutionFiles());
if (execution_writer_ != nullptr) {
if (!execution_writer_->Close().ok()) {
failed_to_close_files.push_back(execution_writer_->FileName());
}
execution_writer_.reset(nullptr);
}
if (graph_execution_traces_writer_ != nullptr) {
if (!graph_execution_traces_writer_->Close().ok()) {
failed_to_close_files.push_back(
graph_execution_traces_writer_->FileName());
}
graph_execution_traces_writer_.reset(nullptr);
}
if (failed_to_close_files.empty()) {
return Status::OK();
} else {
return errors::FailedPrecondition(
"Failed to close %d debug-events files associated with tfdbg",
failed_to_close_files.size());
}
}
// static
std::unordered_map<string, std::unique_ptr<DebugEventsWriter>>*
DebugEventsWriter::GetDebugEventsWriterMap() {
static std::unordered_map<string, std::unique_ptr<DebugEventsWriter>>*
writer_pool =
new std::unordered_map<string, std::unique_ptr<DebugEventsWriter>>();
return writer_pool;
}
DebugEventsWriter::DebugEventsWriter(const string& dump_root,
int64 cyclic_buffer_size)
: env_(Env::Default()),
dump_root_(dump_root),
is_initialized_(false),
initialization_mu_(),
cyclic_buffer_size_(cyclic_buffer_size),
execution_buffer_(),
execution_buffer_mu_(),
graph_execution_trace_buffer_(),
graph_execution_trace_buffer_mu_() {}
Status DebugEventsWriter::InitNonMetadataFile(DebugEventFileType type) {
std::unique_ptr<SingleDebugEventFileWriter>* writer = nullptr;
SelectWriter(type, &writer);
const string filename = GetFileNameInternal(type);
writer->reset();
writer->reset(new SingleDebugEventFileWriter(filename));
if (*writer == nullptr) {
return errors::Unknown("Could not create debug event file writer for ",
filename);
}
TF_RETURN_WITH_CONTEXT_IF_ERROR(
(*writer)->Init(), "Initializing debug event writer at path ", filename);
VLOG(1) << "Successfully opened debug event file: " << filename;
return Status::OK();
}
void DebugEventsWriter::SerializeAndWriteDebugEvent(DebugEvent* debug_event,
DebugEventFileType type) {
std::unique_ptr<SingleDebugEventFileWriter>* writer = nullptr;
SelectWriter(type, &writer);
if (writer != nullptr) {
// Timestamp is in seconds, with double precision.
MaybeSetDebugEventTimestamp(debug_event, env_);
string str;
debug_event->AppendToString(&str);
(*writer)->WriteSerializedDebugEvent(str);
}
}
void DebugEventsWriter::SelectWriter(
DebugEventFileType type,
std::unique_ptr<SingleDebugEventFileWriter>** writer) {
switch (type) {
case METADATA:
*writer = &metadata_writer_;
break;
case SOURCE_FILES:
*writer = &source_files_writer_;
break;
case STACK_FRAMES:
*writer = &stack_frames_writer_;
break;
case GRAPHS:
*writer = &graphs_writer_;
break;
case EXECUTION:
*writer = &execution_writer_;
break;
case GRAPH_EXECUTION_TRACES:
*writer = &graph_execution_traces_writer_;
break;
}
}
const string DebugEventsWriter::GetSuffix(DebugEventFileType type) {
switch (type) {
case METADATA:
return kMetadataSuffix;
case SOURCE_FILES:
return kSourceFilesSuffix;
case STACK_FRAMES:
return kStackFramesSuffix;
case GRAPHS:
return kGraphsSuffix;
case EXECUTION:
return kExecutionSuffix;
case GRAPH_EXECUTION_TRACES:
return kGraphExecutionTracesSuffix;
default:
string suffix;
return suffix;
}
}
string DebugEventsWriter::GetFileNameInternal(DebugEventFileType type) {
const string suffix = GetSuffix(type);
return strings::StrCat(file_prefix_, ".", suffix);
}
} // namespace tfdbg
} // namespace tensorflow

View File

@ -16,6 +16,8 @@ limitations under the License.
#ifndef TENSORFLOW_CORE_UTIL_DEBUG_EVENTS_WRITER_H_
#define TENSORFLOW_CORE_UTIL_DEBUG_EVENTS_WRITER_H_
#include <deque>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/io/record_writer.h"
#include "tensorflow/core/platform/env.h"
@ -24,6 +26,7 @@ limitations under the License.
#include "tensorflow/core/protobuf/debug_event.pb.h"
namespace tensorflow {
namespace tfdbg {
// The set of files generated by a debugged TensorFlow program.
enum DebugEventFileType {
@ -35,10 +38,41 @@ enum DebugEventFileType {
GRAPH_EXECUTION_TRACES,
};
// Helper class for DebugEventsWriter.
// This class manages the writing of data to a single TFRecord file.
// Each object of the DebugEventsWriter class below involves multiple
// TFRecord files, and hence utilizes multiple objects of this helper class.
class SingleDebugEventFileWriter {
public:
explicit SingleDebugEventFileWriter(const string& file_path);
Status Init();
void WriteSerializedDebugEvent(tensorflow::StringPiece debug_event_str);
Status Flush();
Status Close();
const string FileName();
private:
Env* env_;
const string file_path_;
std::atomic_int_fast32_t num_outstanding_events_;
std::unique_ptr<WritableFile> writable_file_;
std::unique_ptr<io::RecordWriter> record_writer_ PT_GUARDED_BY(writer_mu_);
mutex writer_mu_;
};
// The DebugEvents writer class.
class DebugEventsWriter {
public:
#ifndef SWIG
// Prefix of version string present in the first entry of every event file.
// Default size of each cyclic buffer (unit: number of DebugEvent protos).
static constexpr const int64 kDefaultCyclicBufferSize = 1000;
static constexpr const char* kFileNamePrefix = "tfdbg_events";
static constexpr const char* kMetadataSuffix = "metadata";
static constexpr const char* kSourceFilesSuffix = "source_files";
@ -56,18 +90,18 @@ class DebugEventsWriter {
// For a given dump_root value, it is a singleton. tfdbg event files come in
// sets of six. The singleton pattern avoids storing multiple sets in a single
// folder, which might cause confusion.
//
// Args:
// dump_root: Dump root directory. If it doesn't exist, will be created.
// cyclic_buffer_size: Cyclic buffer size (in number of DebugEvent protos).
// If set to a value <=0, will abolish the cyclic-buffer behavior.
// Returns:
// A pointer to a DebugEventsWriter object: a per-dump_root singleton.
static DebugEventsWriter* GetDebugEventsWriter(const string& dump_root,
int64 cyclic_buffer_size);
// Same as the 2-arg factory method above, but uses the default cyclic buffer
// size.
static DebugEventsWriter* GetDebugEventsWriter(const string& dump_root);
// Get the size of the cyclic buffer for execution data (in number
// of DebugEvent protos).
static const int64 GetDebugEventsWriterExecutionBufferSize();
// Set the size of the cyclic buffer for execution data (in number
// of DebugEvent protos). Applies to all dump roots that will be
// created after this call. Setting it to a negative number abolishes
// the cyclic-buffer behavior.
static void SetDebugEventsWriterExecutionBufferSize(int64 size);
~DebugEventsWriter();
// Sets the debug event filenames and opens file for writing.
@ -81,23 +115,28 @@ class DebugEventsWriter {
// The four DebugEvent fields below are written _without_ the cyclic buffer.
// Source file contents are written to the *.source_files file.
void WriteSourceFile(const SourceFile& source_file);
// Takes ownership of source_file.
void WriteSourceFile(SourceFile* source_file);
// Stack frames are written to the *.code_locations file.
void WriteStackFrameWithId(const StackFrameWithId& stack_frame_with_id);
// Takes ownership of stack_frame_with_id.
void WriteStackFrameWithId(StackFrameWithId* stack_frame_with_id);
// Graph op creation events are written to the *.graphs file.
void WriteGraphOpCreation(const GraphOpCreation& graph_op_creation);
// Takes ownership of graph_op_creation.
void WriteGraphOpCreation(GraphOpCreation* graph_op_creation);
// Debugged graphs are written to the *.graphs file.
void WriteDebuggedGraph(const DebuggedGraph& debugged_graph);
// Takes ownership of debugged_graph.
void WriteDebuggedGraph(DebuggedGraph* debugged_graph);
// The two DebugEvent fields below are written to the cyclic buffer
// and saved to disk only at the FlushExecutionFiles() call.
// Execution events (eager execution of an op or a tf.function) are written to
// the *.execution file.
void WriteEagerExecution(const Execution& execution);
// Takes ownership of execution.
void WriteExecution(Execution* execution);
// Graph execution traces (graph-internal tensor values or their summaries)
// are written to the *.graph_execution_traces file.
void WriteGraphExecutionTrace(
const GraphExecutionTrace& graph_execution_trace);
// Takes ownership of graph_execution_trace.
void WriteGraphExecutionTrace(GraphExecutionTrace* graph_execution_trace);
// EventWriter automatically flushes and closes on destruction, but
// this method is provided for users who want to write to disk sooner
@ -115,16 +154,59 @@ class DebugEventsWriter {
Status Close();
private:
// Blocks access to constructor for singleton pattern.
DebugEventsWriter(const string& dump_root);
static std::unordered_map<string, std::unique_ptr<DebugEventsWriter>>*
// Get a static map from dump-root path to DebugEventsWriter objects.
// This helps the per-dump-root singletone pattern.
GetDebugEventsWriterMap();
// Guards calls to the GetDebugEventsWriter() method.
static mutex factory_mu_;
DebugEventsWriter(const string& dump_root, int64 cyclic_buffer_size);
// Get the path prefix. The same for all files, which differ only in the
// suffix.
string FileName(DebugEventFileType type);
TF_DISALLOW_COPY_AND_ASSIGN(DebugEventsWriter);
}
// Initialize the TFRecord writer for non-metadata file type.
Status InitNonMetadataFile(DebugEventFileType type);
void SerializeAndWriteDebugEvent(DebugEvent* debug_event,
DebugEventFileType type);
void SelectWriter(DebugEventFileType type,
std::unique_ptr<SingleDebugEventFileWriter>** writer);
const string GetSuffix(DebugEventFileType type);
string GetFileNameInternal(DebugEventFileType type);
Env* env_;
const string dump_root_;
string file_prefix_;
bool is_initialized_ GUARDED_BY(initialization_mu_);
mutex initialization_mu_;
const int64 cyclic_buffer_size_;
std::deque<DebugEvent> execution_buffer_ GUARDED_BY(execution_buffer_mu_);
mutex execution_buffer_mu_;
std::deque<DebugEvent> graph_execution_trace_buffer_
GUARDED_BY(graph_execution_trace_buffer_mu_);
mutex graph_execution_trace_buffer_mu_;
std::unique_ptr<SingleDebugEventFileWriter> metadata_writer_;
std::unique_ptr<SingleDebugEventFileWriter> source_files_writer_;
std::unique_ptr<SingleDebugEventFileWriter> stack_frames_writer_;
std::unique_ptr<SingleDebugEventFileWriter> graphs_writer_;
std::unique_ptr<SingleDebugEventFileWriter> execution_writer_;
std::unique_ptr<SingleDebugEventFileWriter> graph_execution_traces_writer_;
TF_DISALLOW_COPY_AND_ASSIGN(DebugEventsWriter);
friend class DebugEventsWriterTest;
};
} // namespace tfdbg
} // namespace tensorflow
#endif // TENSORFLOW_CORE_UTIL_DEBUG_EVENTS_WRITER_H_

View File

@ -0,0 +1,801 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/util/debug_events_writer.h"
#include <vector>
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/core/threadpool.h"
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/io/record_reader.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/platform/test.h"
#include "tensorflow/core/protobuf/graph_debug_info.pb.h"
namespace tensorflow {
namespace tfdbg {
// shorthand
Env* env() { return Env::Default(); }
class DebugEventsWriterTest : public ::testing::Test {
public:
static string GetDebugEventFileName(DebugEventsWriter* writer,
DebugEventFileType type) {
return writer->FileName(type);
}
static void ReadDebugEventProtos(DebugEventsWriter* writer,
DebugEventFileType type,
std::vector<DebugEvent>* protos) {
protos->clear();
const string filename = writer->FileName(type);
std::unique_ptr<RandomAccessFile> debug_events_file;
TF_CHECK_OK(env()->NewRandomAccessFile(filename, &debug_events_file));
io::RecordReader* reader = new io::RecordReader(debug_events_file.get());
uint64 offset = 0;
DebugEvent actual;
while (ReadDebugEventProto(reader, &offset, &actual)) {
protos->push_back(actual);
}
delete reader;
}
static bool ReadDebugEventProto(io::RecordReader* reader, uint64* offset,
DebugEvent* proto) {
tstring record;
Status s = reader->ReadRecord(offset, &record);
if (!s.ok()) {
return false;
}
return ParseProtoUnlimited(proto, record);
}
void SetUp() override {
dump_root_ = io::JoinPath(testing::TmpDir(),
strings::Printf("%010lld", env()->NowMicros()));
}
void TearDown() override {
if (env()->IsDirectory(dump_root_).ok()) {
int64 undeleted_files = 0;
int64 undeleted_dirs = 0;
ASSERT_TRUE(
env()
->DeleteRecursively(dump_root_, &undeleted_files, &undeleted_dirs)
.ok());
ASSERT_EQ(0, undeleted_files);
ASSERT_EQ(0, undeleted_dirs);
}
}
string dump_root_;
};
TEST_F(DebugEventsWriterTest, GetDebugEventsWriterSameRootGivesSameObject) {
// Test the per-dump_root_ singleton pattern.
DebugEventsWriter* writer_1 =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
DebugEventsWriter* writer_2 =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
EXPECT_EQ(writer_1, writer_2);
}
TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterSameDumpRoot) {
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 4);
std::vector<DebugEventsWriter*> writers;
mutex mu;
auto fn = [this, &writers, &mu]() {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
{
mutex_lock l(mu);
writers.push_back(writer);
}
};
for (size_t i = 0; i < 4; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
EXPECT_EQ(writers.size(), 4);
EXPECT_EQ(writers[0], writers[1]);
EXPECT_EQ(writers[1], writers[2]);
EXPECT_EQ(writers[2], writers[3]);
}
TEST_F(DebugEventsWriterTest, ConcurrentGetDebugEventsWriterDiffDumpRoots) {
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 3);
std::atomic_int_fast64_t counter(0);
std::vector<DebugEventsWriter*> writers;
mutex mu;
auto fn = [this, &counter, &writers, &mu]() {
const string new_dump_root =
io::JoinPath(dump_root_, strings::Printf("%ld", counter.fetch_add(1)));
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(new_dump_root);
{
mutex_lock l(mu);
writers.push_back(writer);
}
};
for (size_t i = 0; i < 3; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
EXPECT_EQ(writers.size(), 3);
EXPECT_NE(writers[0], writers[1]);
EXPECT_NE(writers[0], writers[2]);
EXPECT_NE(writers[1], writers[2]);
}
TEST_F(DebugEventsWriterTest, GetDebugEventsWriterDifferentRoots) {
// Test the DebugEventsWriters for different directories are different.
DebugEventsWriter* writer_1 =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
const string dump_root_2 = io::JoinPath(dump_root_, "subdirectory");
DebugEventsWriter* writer_2 =
DebugEventsWriter::GetDebugEventsWriter(dump_root_2);
EXPECT_NE(writer_1, writer_2);
}
TEST_F(DebugEventsWriterTest, GetAndInitDebugEventsWriter) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
TF_ASSERT_OK(writer->Close());
// Verify the metadata file's content.
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
EXPECT_EQ(actuals.size(), 1);
EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
// Check the content of the file version string.
const string file_version = actuals[0].debug_metadata().file_version();
EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
// Verify that the .source_files file has been created and is empty.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
// Verify that the .stack_frames file has been created and is empty.
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
}
TEST_F(DebugEventsWriterTest, CallingCloseWithoutInitIsOkay) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Close());
}
TEST_F(DebugEventsWriterTest, CallingCloseTwiceIsOkay) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Close());
TF_ASSERT_OK(writer->Close());
}
TEST_F(DebugEventsWriterTest, ConcurrentInitCalls) {
// Test that concurrent calls to Init() works correctly.
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 4);
auto fn = [&writer]() { TF_ASSERT_OK(writer->Init()); };
for (size_t i = 0; i < 3; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
// Verify the metadata file's content.
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
EXPECT_EQ(actuals.size(), 1);
EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
// Check the content of the file version string.
const string file_version = actuals[0].debug_metadata().file_version();
EXPECT_EQ(file_version.find(DebugEventsWriter::kVersionPrefix), 0);
EXPECT_GT(file_version.size(), strlen(DebugEventsWriter::kVersionPrefix));
// Verify that the .source_files file has been created and is empty.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
// Verify that the .stack_frames file has been created and is empty.
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
}
TEST_F(DebugEventsWriterTest, InitTwiceDoesNotCreateNewMetadataFile) {
// Test that Init() is idempotent.
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
EXPECT_EQ(actuals.size(), 1);
EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
string metadata_path_1 =
GetDebugEventFileName(writer, DebugEventFileType::METADATA);
TF_ASSERT_OK(writer->Init());
EXPECT_EQ(GetDebugEventFileName(writer, DebugEventFileType::METADATA),
metadata_path_1);
TF_ASSERT_OK(writer->Close());
// Verify the metadata file's content.
ReadDebugEventProtos(writer, DebugEventFileType::METADATA, &actuals);
EXPECT_EQ(actuals.size(), 1);
EXPECT_GT(actuals[0].debug_metadata().tensorflow_version().length(), 0);
EXPECT_GE(actuals[0].debug_metadata().file_version().size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteSourceFile) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
SourceFile* source_file_1 = new SourceFile();
source_file_1->set_file_path("/home/tf_programs/main.py");
source_file_1->set_host_name("localhost.localdomain");
source_file_1->add_lines("import tensorflow as tf");
source_file_1->add_lines("");
source_file_1->add_lines("print(tf.constant([42.0]))");
source_file_1->add_lines("");
writer->WriteSourceFile(source_file_1);
SourceFile* source_file_2 = new SourceFile();
source_file_2->set_file_path("/home/tf_programs/train.py");
source_file_2->set_host_name("localhost.localdomain");
source_file_2->add_lines("import tensorflow.keras as keras");
source_file_2->add_lines("");
source_file_2->add_lines("model = keras.Sequential()");
writer->WriteSourceFile(source_file_2);
TF_ASSERT_OK(writer->FlushNonExecutionFiles());
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), 2);
EXPECT_GT(actuals[0].wall_time(), 0);
EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
SourceFile actual_source_file_1 = actuals[0].source_file();
EXPECT_EQ(actual_source_file_1.file_path(), "/home/tf_programs/main.py");
EXPECT_EQ(actual_source_file_1.host_name(), "localhost.localdomain");
EXPECT_EQ(actual_source_file_1.lines().size(), 4);
EXPECT_EQ(actual_source_file_1.lines()[0], "import tensorflow as tf");
EXPECT_EQ(actual_source_file_1.lines()[1], "");
EXPECT_EQ(actual_source_file_1.lines()[2], "print(tf.constant([42.0]))");
EXPECT_EQ(actual_source_file_1.lines()[3], "");
SourceFile actual_source_file_2 = actuals[1].source_file();
EXPECT_EQ(actual_source_file_2.file_path(), "/home/tf_programs/train.py");
EXPECT_EQ(actual_source_file_2.host_name(), "localhost.localdomain");
EXPECT_EQ(actual_source_file_2.lines().size(), 3);
EXPECT_EQ(actual_source_file_2.lines()[0],
"import tensorflow.keras as keras");
EXPECT_EQ(actual_source_file_2.lines()[1], "");
EXPECT_EQ(actual_source_file_2.lines()[2], "model = keras.Sequential()");
// Verify no cross talk in the other non-execution debug-event files.
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteStackFramesFile) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
StackFrameWithId* stack_frame_1 = new StackFrameWithId();
stack_frame_1->set_id("deadbeaf");
GraphDebugInfo::FileLineCol* file_line_col =
stack_frame_1->mutable_file_line_col();
file_line_col->set_file_index(12);
file_line_col->set_line(20);
file_line_col->set_col(2);
file_line_col->set_func("my_func");
file_line_col->set_code(" x = y + z");
StackFrameWithId* stack_frame_2 = new StackFrameWithId();
stack_frame_2->set_id("eeeeeeec");
file_line_col = stack_frame_2->mutable_file_line_col();
file_line_col->set_file_index(12);
file_line_col->set_line(21);
file_line_col->set_col(4);
file_line_col->set_func("my_func");
file_line_col->set_code(" x = x ** 2.0");
writer->WriteStackFrameWithId(stack_frame_1);
writer->WriteStackFrameWithId(stack_frame_2);
TF_ASSERT_OK(writer->FlushNonExecutionFiles());
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), 2);
EXPECT_GT(actuals[0].wall_time(), 0);
EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
StackFrameWithId actual_stack_frame_1 = actuals[0].stack_frame_with_id();
EXPECT_EQ(actual_stack_frame_1.id(), "deadbeaf");
GraphDebugInfo::FileLineCol file_line_col_1 =
actual_stack_frame_1.file_line_col();
EXPECT_EQ(file_line_col_1.file_index(), 12);
EXPECT_EQ(file_line_col_1.line(), 20);
EXPECT_EQ(file_line_col_1.col(), 2);
EXPECT_EQ(file_line_col_1.func(), "my_func");
EXPECT_EQ(file_line_col_1.code(), " x = y + z");
StackFrameWithId actual_stack_frame_2 = actuals[1].stack_frame_with_id();
EXPECT_EQ(actual_stack_frame_2.id(), "eeeeeeec");
GraphDebugInfo::FileLineCol file_line_col_2 =
actual_stack_frame_2.file_line_col();
EXPECT_EQ(file_line_col_2.file_index(), 12);
EXPECT_EQ(file_line_col_2.line(), 21);
EXPECT_EQ(file_line_col_2.col(), 4);
EXPECT_EQ(file_line_col_2.func(), "my_func");
EXPECT_EQ(file_line_col_2.code(), " x = x ** 2.0");
// Verify no cross talk in the other non-execution debug-event files.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteGraphOpCreationAndDebuggedGraph) {
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
GraphOpCreation* graph_op_creation = new GraphOpCreation();
graph_op_creation->set_op_type("MatMul");
graph_op_creation->set_op_name("Dense_1/MatMul");
writer->WriteGraphOpCreation(graph_op_creation);
DebuggedGraph* debugged_graph = new DebuggedGraph();
debugged_graph->set_graph_id("deadbeaf");
debugged_graph->set_graph_name("my_func_graph");
writer->WriteDebuggedGraph(debugged_graph);
TF_ASSERT_OK(writer->FlushNonExecutionFiles());
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), 2);
EXPECT_GT(actuals[0].wall_time(), 0);
EXPECT_GT(actuals[1].wall_time(), actuals[0].wall_time());
GraphOpCreation actual_op_creation = actuals[0].graph_op_creation();
EXPECT_EQ(actual_op_creation.op_type(), "MatMul");
EXPECT_EQ(actual_op_creation.op_name(), "Dense_1/MatMul");
DebuggedGraph actual_debugged_graph = actuals[1].debugged_graph();
EXPECT_EQ(actual_debugged_graph.graph_id(), "deadbeaf");
EXPECT_EQ(actual_debugged_graph.graph_name(), "my_func_graph");
// Verify no cross talk in the other non-execution debug-event files.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheSameFile) {
const size_t kConcurrentWrites = 100;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 8);
std::atomic_int_fast64_t counter(0);
auto fn = [&writer, &counter]() {
const string file_path = strings::Printf(
"/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
SourceFile* source_file = new SourceFile();
source_file->set_file_path(file_path);
source_file->set_host_name("localhost.localdomain");
writer->WriteSourceFile(source_file);
};
for (size_t i = 0; i < kConcurrentWrites; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), kConcurrentWrites);
std::vector<string> file_paths;
std::vector<string> host_names;
for (size_t i = 0; i < kConcurrentWrites; ++i) {
file_paths.push_back(actuals[i].source_file().file_path());
host_names.push_back(actuals[i].source_file().host_name());
}
std::sort(file_paths.begin(), file_paths.end());
for (size_t i = 0; i < kConcurrentWrites; ++i) {
EXPECT_EQ(file_paths[i],
strings::Printf("/home/tf_programs/program_%.3ld.py", i));
EXPECT_EQ(host_names[i], "localhost.localdomain");
}
}
TEST_F(DebugEventsWriterTest, ConcurrentWriteAndFlushCallsToTheSameFile) {
const size_t kConcurrentWrites = 100;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 8);
std::atomic_int_fast64_t counter(0);
auto fn = [&writer, &counter]() {
const string file_path = strings::Printf(
"/home/tf_programs/program_%.3ld.py", counter.fetch_add(1));
SourceFile* source_file = new SourceFile();
source_file->set_file_path(file_path);
source_file->set_host_name("localhost.localdomain");
writer->WriteSourceFile(source_file);
TF_ASSERT_OK(writer->FlushNonExecutionFiles());
};
for (size_t i = 0; i < kConcurrentWrites; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), kConcurrentWrites);
std::vector<string> file_paths;
std::vector<string> host_names;
for (size_t i = 0; i < kConcurrentWrites; ++i) {
file_paths.push_back(actuals[i].source_file().file_path());
host_names.push_back(actuals[i].source_file().host_name());
}
std::sort(file_paths.begin(), file_paths.end());
for (size_t i = 0; i < kConcurrentWrites; ++i) {
EXPECT_EQ(file_paths[i],
strings::Printf("/home/tf_programs/program_%.3ld.py", i));
EXPECT_EQ(host_names[i], "localhost.localdomain");
}
}
TEST_F(DebugEventsWriterTest, ConcurrentWriteCallsToTheDifferentFiles) {
const int32 kConcurrentWrites = 30;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_);
TF_ASSERT_OK(writer->Init());
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 10);
std::atomic_int_fast32_t counter(0);
auto fn = [&writer, &counter]() {
const int32 index = counter.fetch_add(1);
if (index % 3 == 0) {
SourceFile* source_file = new SourceFile();
source_file->set_file_path(
strings::Printf("/home/tf_programs/program_%.2d.py", index));
source_file->set_host_name("localhost.localdomain");
writer->WriteSourceFile(source_file);
} else if (index % 3 == 1) {
StackFrameWithId* stack_frame = new StackFrameWithId();
stack_frame->set_id(strings::Printf("e%.2d", index));
writer->WriteStackFrameWithId(stack_frame);
} else {
GraphOpCreation* op_creation = new GraphOpCreation();
op_creation->set_op_type("Log");
op_creation->set_op_name(strings::Printf("Log_%.2d", index));
writer->WriteGraphOpCreation(op_creation);
}
};
for (size_t i = 0; i < kConcurrentWrites; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
std::vector<string> file_paths;
std::vector<string> host_names;
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
file_paths.push_back(actuals[i].source_file().file_path());
host_names.push_back(actuals[i].source_file().host_name());
}
std::sort(file_paths.begin(), file_paths.end());
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
EXPECT_EQ(file_paths[i],
strings::Printf("/home/tf_programs/program_%.2d.py", i * 3));
EXPECT_EQ(host_names[i], "localhost.localdomain");
}
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
std::vector<string> stack_frame_ids;
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
stack_frame_ids.push_back(actuals[i].stack_frame_with_id().id());
}
std::sort(stack_frame_ids.begin(), stack_frame_ids.end());
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
EXPECT_EQ(stack_frame_ids[i], strings::Printf("e%.2d", i * 3 + 1));
}
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), kConcurrentWrites / 3);
std::vector<string> op_types;
std::vector<string> op_names;
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
op_types.push_back(actuals[i].graph_op_creation().op_type());
op_names.push_back(actuals[i].graph_op_creation().op_name());
}
std::sort(op_names.begin(), op_names.end());
for (int32 i = 0; i < kConcurrentWrites / 3; ++i) {
EXPECT_EQ(op_types[i], "Log");
EXPECT_EQ(op_names[i], strings::Printf("Log_%.2d", i * 3 + 2));
}
}
TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferNoFlush) {
// Verify that no writing to disk happens until the flushing method is called.
const size_t kCyclicBufferSize = 10;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_, kCyclicBufferSize);
TF_ASSERT_OK(writer->Init());
// First, try writing and flushing more debug events than the capacity
// of the cyclic buffer, in a serial fashion.
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
Execution* execution = new Execution();
execution->set_op_type("Log");
execution->add_input_tensor_ids(i);
writer->WriteExecution(execution);
}
std::vector<DebugEvent> actuals;
// Before FlushExecutionFiles() is called, the file should be empty.
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteExecutionWithCyclicBufferFlush) {
// Verify that writing to disk happens when the flushing method is called.
const size_t kCyclicBufferSize = 10;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_, kCyclicBufferSize);
TF_ASSERT_OK(writer->Init());
// First, try writing and flushing more debug events than the capacity
// of the cyclic buffer, in a serial fashion.
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
Execution* execution = new Execution();
execution->set_op_type("Log");
execution->add_input_tensor_ids(i);
writer->WriteExecution(execution);
}
TF_ASSERT_OK(writer->FlushExecutionFiles());
std::vector<DebugEvent> actuals;
// Expect there to be only the last kCyclicBufferSize debug events,
// and the order should be correct.
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
EXPECT_EQ(actuals.size(), kCyclicBufferSize);
for (size_t i = 0; i < kCyclicBufferSize; ++i) {
EXPECT_EQ(actuals[i].execution().op_type(), "Log");
EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0],
kCyclicBufferSize + i);
}
// Second, write more than the capacity of the cyclic buffer,
// in a concurrent fashion.
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 8);
std::atomic_int_fast64_t counter(0);
auto fn = [&writer, &counter]() {
Execution* execution = new Execution();
execution->set_op_type("Abs");
execution->add_input_tensor_ids(counter.fetch_add(1));
writer->WriteExecution(execution);
};
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
// NOTE: This includes the files from the first stage above, because the
// .execution file hasn't changed.
EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
for (size_t i = 0; i < kCyclicBufferSize; ++i) {
const size_t index = i + kCyclicBufferSize;
EXPECT_EQ(actuals[index].execution().op_type(), "Abs");
EXPECT_EQ(actuals[index].execution().input_tensor_ids().size(), 1);
EXPECT_GE(actuals[index].execution().input_tensor_ids()[0], 0);
EXPECT_LE(actuals[index].execution().input_tensor_ids()[0],
kCyclicBufferSize * 2);
}
// Verify no cross-talk.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferNoFlush) {
// Check no writing to disk happens before the flushing method is called.
const size_t kCyclicBufferSize = 10;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_, kCyclicBufferSize);
TF_ASSERT_OK(writer->Init());
// First, try writing and flushing more debug events than the capacity
// of the cyclic buffer, in a serial fashion.
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
GraphExecutionTrace* trace = new GraphExecutionTrace();
trace->set_graph_id(strings::Printf("graph_%.2ld", i));
writer->WriteGraphExecutionTrace(trace);
}
std::vector<DebugEvent> actuals;
// Before FlushExecutionFiles() is called, the file should be empty.
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, WriteGrahExecutionTraceWithCyclicBufferFlush) {
const size_t kCyclicBufferSize = 10;
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_, kCyclicBufferSize);
TF_ASSERT_OK(writer->Init());
// First, try writing and flushing more debug events than the capacity
// of the cyclic buffer, in a serial fashion.
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
GraphExecutionTrace* trace = new GraphExecutionTrace();
trace->set_graph_id(strings::Printf("graph_%.2ld", i));
writer->WriteGraphExecutionTrace(trace);
}
TF_ASSERT_OK(writer->FlushExecutionFiles());
std::vector<DebugEvent> actuals;
// Expect there to be only the last kCyclicBufferSize debug events,
// and the order should be correct.
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
EXPECT_EQ(actuals.size(), kCyclicBufferSize);
for (size_t i = 0; i < kCyclicBufferSize; ++i) {
EXPECT_EQ(actuals[i].graph_execution_trace().graph_id(),
strings::Printf("graph_%.2ld", i + kCyclicBufferSize));
}
// Second, write more than the capacity of the cyclic buffer,
// in a concurrent fashion.
thread::ThreadPool* thread_pool =
new thread::ThreadPool(Env::Default(), "test_pool", 8);
std::atomic_int_fast64_t counter(0);
auto fn = [&writer, &counter]() {
GraphExecutionTrace* trace = new GraphExecutionTrace();
trace->set_graph_id(
strings::Printf("new_graph_%.2ld", counter.fetch_add(1)));
writer->WriteGraphExecutionTrace(trace);
};
for (size_t i = 0; i < kCyclicBufferSize * 2; ++i) {
thread_pool->Schedule(fn);
}
delete thread_pool;
TF_ASSERT_OK(writer->Close());
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
// NOTE: This includes the files from the first stage above, because the
// .graph_execution_traces file hasn't changed.
EXPECT_EQ(actuals.size(), kCyclicBufferSize * 2);
for (size_t i = 0; i < kCyclicBufferSize; ++i) {
const size_t index = i + kCyclicBufferSize;
EXPECT_EQ(
actuals[index].graph_execution_trace().graph_id().find("new_graph_"),
0);
}
// Verify no cross-talk.
ReadDebugEventProtos(writer, DebugEventFileType::SOURCE_FILES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::STACK_FRAMES, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::GRAPHS, &actuals);
EXPECT_EQ(actuals.size(), 0);
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
EXPECT_EQ(actuals.size(), 0);
}
TEST_F(DebugEventsWriterTest, DisableCyclicBufferBeahavior) {
const size_t kCyclicBufferSize = 0; // A value <= 0 disables cyclic behavior.
DebugEventsWriter* writer =
DebugEventsWriter::GetDebugEventsWriter(dump_root_, kCyclicBufferSize);
TF_ASSERT_OK(writer->Init());
const size_t kNumEvents = 20;
for (size_t i = 0; i < kNumEvents; ++i) {
Execution* execution = new Execution();
execution->set_op_type("Log");
execution->add_input_tensor_ids(i);
writer->WriteExecution(execution);
}
TF_ASSERT_OK(writer->FlushExecutionFiles());
std::vector<DebugEvent> actuals;
ReadDebugEventProtos(writer, DebugEventFileType::EXECUTION, &actuals);
EXPECT_EQ(actuals.size(), kNumEvents);
for (size_t i = 0; i < kNumEvents; ++i) {
EXPECT_EQ(actuals[i].execution().op_type(), "Log");
EXPECT_EQ(actuals[i].execution().input_tensor_ids().size(), 1);
EXPECT_EQ(actuals[i].execution().input_tensor_ids()[0], i);
}
for (size_t i = 0; i < kNumEvents; ++i) {
GraphExecutionTrace* trace = new GraphExecutionTrace();
trace->set_graph_id(strings::Printf("graph_%.2ld", i));
writer->WriteGraphExecutionTrace(trace);
}
TF_ASSERT_OK(writer->FlushExecutionFiles());
ReadDebugEventProtos(writer, DebugEventFileType::GRAPH_EXECUTION_TRACES,
&actuals);
EXPECT_EQ(actuals.size(), kNumEvents);
for (size_t i = 0; i < kNumEvents; ++i) {
EXPECT_EQ(actuals[i].graph_execution_trace().graph_id(),
strings::Printf("graph_%.2ld", i));
}
}
} // namespace tfdbg
} // namespace tensorflow