diff --git a/tensorflow/core/common_runtime/debugger_state_interface.h b/tensorflow/core/common_runtime/debugger_state_interface.h index a4ff6b74fc5..fb72f9fa3ea 100644 --- a/tensorflow/core/common_runtime/debugger_state_interface.h +++ b/tensorflow/core/common_runtime/debugger_state_interface.h @@ -39,6 +39,24 @@ class DebuggerStateInterface { // record. See the documentation of DebugNodeInserter::InsertNodes() for // details. virtual Status DecorateGraphForDebug(Graph* graph, Device* device) = 0; + + // Publish metadata about the debugged Session::Run() call. + // + // Args: + // global_step: A global step count supplied by the caller of + // Session::Run(). + // session_run_count: A counter for calls to the Run() method of the + // Session object. + // executor_step_count: A counter for invocations of the executor charged + // to serve this Session::Run() call. + // input_names: Name of the input Tensors (feed keys). + // output_names: Names of the fetched Tensors. + // target_names: Names of the target nodes. + virtual Status PublishDebugMetadata( + const int64 global_step, const int64 session_run_count, + const int64 executor_step_count, const std::vector& input_names, + const std::vector& output_names, + const std::vector& target_nodes) = 0; }; typedef std::function( diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc index 953c4180fd7..f00f5ffd8f7 100644 --- a/tensorflow/core/common_runtime/direct_session.cc +++ b/tensorflow/core/common_runtime/direct_session.cc @@ -397,6 +397,9 @@ Status DirectSession::Run(const RunOptions& run_options, ExecutorsAndKeys* executors_and_keys; RunStateArgs run_state_args; + Executor::Args args; + args.step_id = step_id_counter_.fetch_add(1); + // EXPERIMENTAL: Options that allow the client to insert nodes into partition // graphs for debugging. if (!run_options.debug_options().debug_tensor_watch_opts().empty()) { @@ -407,10 +410,15 @@ Status DirectSession::Run(const RunOptions& run_options, TF_RETURN_IF_ERROR( GetOrCreateExecutors(pool, input_tensor_names, output_names, target_nodes, &executors_and_keys, &run_state_args)); + const int64 executor_step_count = executors_and_keys->step_count.fetch_add(1); + + if (run_state_args.debugger_state) { + TF_RETURN_IF_ERROR(run_state_args.debugger_state->PublishDebugMetadata( + run_options.debug_options().global_step(), args.step_id, + executor_step_count, input_tensor_names, output_names, target_nodes)); + } // Create a run state and start execution. - Executor::Args args; - args.step_id = step_id_counter_.fetch_add(1); RunState run_state(args.step_id, &devices_); run_state.rendez = new IntraProcessRendezvous(device_mgr_.get()); CancellationManager step_cancellation_manager; @@ -450,8 +458,7 @@ Status DirectSession::Run(const RunOptions& run_options, options_.config.graph_options().build_cost_model(); const int64 build_cost_model_after = options_.config.graph_options().build_cost_model_after(); - int measure_step_count = - executors_and_keys->step_count - build_cost_model_after; + int measure_step_count = executor_step_count - build_cost_model_after; if (measure_step_count >= 0) { update_cost_model = ((measure_step_count + 1) % build_cost_model_every == 0); @@ -527,7 +534,6 @@ Status DirectSession::Run(const RunOptions& run_options, // Build and return the cost model as instructed. mutex_lock l(executor_lock_); - ++executors_and_keys->step_count; if (update_cost_model) { // Build the cost model std::unordered_map device_to_graph; diff --git a/tensorflow/core/common_runtime/direct_session.h b/tensorflow/core/common_runtime/direct_session.h index ff4786f998d..3e3a5eaa8f4 100644 --- a/tensorflow/core/common_runtime/direct_session.h +++ b/tensorflow/core/common_runtime/direct_session.h @@ -125,7 +125,9 @@ class DirectSession : public Session { // library. Consider giving each partition its own function library to enable // per-partition rewrites. struct ExecutorsAndKeys { - int64 step_count = 0; + ExecutorsAndKeys() : step_count(0) {} + + std::atomic_int_fast64_t step_count; std::unique_ptr graph; NameNodeMap name_to_node; std::unique_ptr flib_def; diff --git a/tensorflow/core/debug/debug_graph_utils.cc b/tensorflow/core/debug/debug_graph_utils.cc index 06c129ecc84..217022759bc 100644 --- a/tensorflow/core/debug/debug_graph_utils.cc +++ b/tensorflow/core/debug/debug_graph_utils.cc @@ -75,6 +75,16 @@ Status DebuggerState::DecorateGraphForDebug(Graph* graph, Device* device) { return status; } +Status DebuggerState::PublishDebugMetadata( + const int64 global_step, const int64 session_run_count, + const int64 executor_step_count, const std::vector& input_names, + const std::vector& output_names, + const std::vector& target_nodes) { + return DebugIO::PublishDebugMetadata(global_step, session_run_count, + executor_step_count, input_names, + output_names, target_nodes, debug_urls_); +} + // static Status DebugNodeInserter::InsertNodes( const protobuf::RepeatedPtrField& watches, Graph* graph, diff --git a/tensorflow/core/debug/debug_graph_utils.h b/tensorflow/core/debug/debug_graph_utils.h index cda2c31f0ce..6edd26c2602 100644 --- a/tensorflow/core/debug/debug_graph_utils.h +++ b/tensorflow/core/debug/debug_graph_utils.h @@ -43,6 +43,17 @@ class DebuggerState : public DebuggerStateInterface { const protobuf::RepeatedPtrField& watches; + // Publish metadata about the debugged Session::Run() call. + // + // See the doc string of DebuggerStateInterface::PublishDebugMetadata() for + // details. + Status PublishDebugMetadata(const int64 global_step, + const int64 session_run_count, + const int64 executor_step_count, + const std::vector& input_names, + const std::vector& output_names, + const std::vector& target_names); + private: std::unordered_set debug_urls_; }; diff --git a/tensorflow/core/debug/debug_grpc_io_utils_test.cc b/tensorflow/core/debug/debug_grpc_io_utils_test.cc index acf1224be19..bb79528cc19 100644 --- a/tensorflow/core/debug/debug_grpc_io_utils_test.cc +++ b/tensorflow/core/debug/debug_grpc_io_utils_test.cc @@ -80,10 +80,10 @@ TEST_F(GrpcDebugTest, AttemptToSendToNonexistentGrpcAddress) { "foo_tensor", "DebugIdentity", tensor, Env::Default()->NowMicros(), {kInvalidGrpcUrl}); ASSERT_FALSE(publish_status.ok()); - ASSERT_NE( - string::npos, - publish_status.error_message().find( - "Channel at the following gRPC address is not ready: 0.0.0.0:0")); + ASSERT_NE(string::npos, + publish_status.error_message().find( + "Channel at the following gRPC stream URL is not ready: " + "grpc://0.0.0.0:0")); DebugIO::CloseDebugURL(kInvalidGrpcUrl); } diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc index 78196007650..f4136dcabaa 100644 --- a/tensorflow/core/debug/debug_io_utils.cc +++ b/tensorflow/core/debug/debug_io_utils.cc @@ -98,6 +98,83 @@ const char* const DebugIO::kFileURLScheme = "file://"; // static const char* const DebugIO::kGrpcURLScheme = "grpc://"; +// static +Status DebugIO::PublishDebugMetadata( + const int64 global_step, const int64 session_run_count, + const int64 executor_step_count, const std::vector& input_names, + const std::vector& output_names, + const std::vector& target_nodes, + const std::unordered_set& debug_urls) { + std::ostringstream oss; + + // Construct a JSON string to carry the metadata. + oss << "{"; + oss << "\"global_step\":" << global_step << ","; + oss << "\"session_run_count\":" << session_run_count << ","; + oss << "\"executor_step_count\":" << executor_step_count << ","; + oss << "\"input_names\":["; + for (size_t i = 0; i < input_names.size(); ++i) { + oss << "\"" << input_names[i] << "\""; + if (i < input_names.size() - 1) { + oss << ","; + } + } + oss << "],"; + oss << "\"output_names\":["; + for (size_t i = 0; i < output_names.size(); ++i) { + oss << "\"" << output_names[i] << "\""; + if (i < output_names.size() - 1) { + oss << ","; + } + } + oss << "],"; + oss << "\"target_nodes\":["; + for (size_t i = 0; i < target_nodes.size(); ++i) { + oss << "\"" << target_nodes[i] << "\""; + if (i < target_nodes.size() - 1) { + oss << ","; + } + } + oss << "]"; + oss << "}"; + + const string json_metadata = oss.str(); + Event event; + event.set_wall_time(static_cast(Env::Default()->NowMicros())); + LogMessage* log_message = event.mutable_log_message(); + log_message->set_message(json_metadata); + + Status status; + for (const string& url : debug_urls) { + if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) { + Event grpc_event; + + // Determine the path (if any) in the grpc:// URL, and add it as a field + // of the JSON string. + const string address = url.substr(strlen(DebugIO::kFileURLScheme)); + const string path = address.find("/") == string::npos + ? "" + : address.substr(address.find("/")); + grpc_event.set_wall_time(event.wall_time()); + LogMessage* log_message_grpc = grpc_event.mutable_log_message(); + log_message_grpc->set_message( + strings::StrCat(json_metadata.substr(0, json_metadata.size() - 1), + ",\"grpc_path\":\"", path, "\"}")); + + status.Update( + DebugGrpcIO::SendEventProtoThroughGrpcStream(grpc_event, url)); + } else if (str_util::Lowercase(url).find(kFileURLScheme) == 0) { + const string dump_root_dir = url.substr(strlen(kFileURLScheme)); + const string file_name = + strings::StrCat("_tfdbg_core_metadata_", Env::Default()->NowMicros()); + status.Update( + DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); + } + } + + return status; +} + // static Status DebugIO::PublishDebugTensor(const string& tensor_name, const string& debug_op, const Tensor& tensor, @@ -136,10 +213,8 @@ Status DebugIO::PublishDebugTensor(const string& tensor_name, fail_statuses.push_back(s); } } else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) { - const string grpc_server_stream_addr = url.substr(strlen(kGrpcURLScheme)); Status s = DebugGrpcIO::SendTensorThroughGrpcStream( - node_name, output_slot, debug_op, tensor, wall_time_us, - grpc_server_stream_addr); + node_name, output_slot, debug_op, tensor, wall_time_us, url); if (!s.ok()) { num_failed_urls++; @@ -189,8 +264,7 @@ Status DebugIO::PublishGraph(const Graph& graph, status.Update( DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name)); } else if (debug_url.find(kGrpcURLScheme) == 0) { - DebugGrpcIO::SendEventProtoThroughGrpcStream( - event, debug_url.substr(strlen(kGrpcURLScheme))); + DebugGrpcIO::SendEventProtoThroughGrpcStream(event, debug_url); } } @@ -200,8 +274,7 @@ Status DebugIO::PublishGraph(const Graph& graph, // static Status DebugIO::CloseDebugURL(const string& debug_url) { if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) { - return DebugGrpcIO::CloseGrpcStream( - debug_url.substr(strlen(DebugIO::kGrpcURLScheme))); + return DebugGrpcIO::CloseGrpcStream(debug_url); } else { // No-op for non-gRPC URLs. return Status::OK(); @@ -348,57 +421,64 @@ std::unordered_map> DebugGrpcIO::stream_channels; // static -Status DebugGrpcIO::SendTensorThroughGrpcStream( - const string& node_name, const int32 output_slot, const string& debug_op, - const Tensor& tensor, const uint64 wall_time_us, - const string& server_stream_addr) { +Status DebugGrpcIO::SendTensorThroughGrpcStream(const string& node_name, + const int32 output_slot, + const string& debug_op, + const Tensor& tensor, + const uint64 wall_time_us, + const string& grpc_stream_url) { const string tensor_name = strings::StrCat(node_name, ":", output_slot); // Prepare tensor Event data to be sent. Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us); - return SendEventProtoThroughGrpcStream(event, server_stream_addr); + return SendEventProtoThroughGrpcStream(event, grpc_stream_url); } // static Status DebugGrpcIO::SendEventProtoThroughGrpcStream( - const Event& event_proto, const string& server_stream_addr) { + const Event& event_proto, const string& grpc_stream_url) { + const string addr_with_path = + grpc_stream_url.substr(strlen(DebugIO::kFileURLScheme)); + const string server_stream_addr = + addr_with_path.substr(0, addr_with_path.find('/')); + std::shared_ptr debug_grpc_channel; { mutex_lock l(streams_mu); - if (stream_channels.find(server_stream_addr) == stream_channels.end()) { + if (stream_channels.find(grpc_stream_url) == stream_channels.end()) { debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr)); if (!debug_grpc_channel->is_channel_ready()) { return errors::FailedPrecondition( - strings::StrCat("Channel at the following gRPC address is ", - "not ready: ", server_stream_addr)); + strings::StrCat("Channel at the following gRPC stream URL is ", + "not ready: ", grpc_stream_url)); } - stream_channels[server_stream_addr] = debug_grpc_channel; + stream_channels[grpc_stream_url] = debug_grpc_channel; } else { - debug_grpc_channel = stream_channels[server_stream_addr]; + debug_grpc_channel = stream_channels[grpc_stream_url]; } } bool write_ok = debug_grpc_channel->WriteEvent(event_proto); if (!write_ok) { return errors::Cancelled(strings::StrCat("Write event to stream URL ", - server_stream_addr, "failed.")); + grpc_stream_url, "failed.")); } return Status::OK(); } -Status DebugGrpcIO::CloseGrpcStream(const string& server_stream_addr) { +Status DebugGrpcIO::CloseGrpcStream(const string& grpc_stream_url) { mutex_lock l(streams_mu); - if (stream_channels.find(server_stream_addr) != stream_channels.end()) { + if (stream_channels.find(grpc_stream_url) != stream_channels.end()) { // Stream of the specified address exists. Close it and remove it from // record. Status s; - s = stream_channels[server_stream_addr]->Close(); - stream_channels.erase(server_stream_addr); + s = stream_channels[grpc_stream_url]->Close(); + stream_channels.erase(grpc_stream_url); return s; } else { // Stream of the specified address does not exist. No action. diff --git a/tensorflow/core/debug/debug_io_utils.h b/tensorflow/core/debug/debug_io_utils.h index a10e24386ad..a12bea6dbcf 100644 --- a/tensorflow/core/debug/debug_io_utils.h +++ b/tensorflow/core/debug/debug_io_utils.h @@ -32,6 +32,13 @@ Status ReadEventFromFile(const string& dump_file_path, Event* event); class DebugIO { public: + static Status PublishDebugMetadata( + const int64 global_step, const int64 session_run_count, + const int64 executor_step_count, const std::vector& input_names, + const std::vector& output_names, + const std::vector& target_nodes, + const std::unordered_set& debug_urls); + // Publish a tensor to a debug target URL. // // Args: @@ -59,7 +66,6 @@ class DebugIO { static Status CloseDebugURL(const string& debug_url); - private: static const char* const kFileURLScheme; static const char* const kGrpcURLScheme; }; @@ -172,18 +178,18 @@ class DebugGrpcIO { const string& debug_op, const Tensor& tensor, const uint64 wall_time_us, - const string& server_stream_addr); + const string& grpc_stream_url); // Send an Event proto through a debug gRPC stream. // Thread-safety: Safe with respect to other calls to the same method and // calls to CloseGrpcStream(). - static Status SendEventProtoThroughGrpcStream( - const Event& event_proto, const string& server_stream_addr); + static Status SendEventProtoThroughGrpcStream(const Event& event_proto, + const string& grpc_stream_url); // Close a gRPC stream to the given address, if it exists. // Thread-safety: Safe with respect to other calls to the same method and // calls to SendTensorThroughGrpcStream(). - static Status CloseGrpcStream(const string& server_stream_addr); + static Status CloseGrpcStream(const string& grpc_stream_url); private: static mutex streams_mu; diff --git a/tensorflow/core/protobuf/debug.proto b/tensorflow/core/protobuf/debug.proto index 5b32f9fc0b6..f35d7569be8 100644 --- a/tensorflow/core/protobuf/debug.proto +++ b/tensorflow/core/protobuf/debug.proto @@ -27,6 +27,13 @@ message DebugTensorWatch { // E.g., "file:///foo/tfdbg_dump", "grpc://localhost:11011" // Each debug op listed in debug_ops will publish its output tensor (debug // signal) to all URLs in debug_urls. + // + // N.B. Session::Run() supports concurrent invocations of the same inputs + // (feed keys), outputs and target nodes. If such concurrent invocations + // are to be debugged, the callers of Session::Run() must use distinct + // debug_urls to make sure that the streamed or dumped events do not overlap + // among the invocations. + // TODO(cais): More visible documentation of this in g3docs. repeated string debug_urls = 4; } @@ -34,4 +41,9 @@ message DebugTensorWatch { message DebugOptions { // Debugging options repeated DebugTensorWatch debug_tensor_watch_opts = 4; + + // Caller-specified global step count. + // Note that this is distinct from the session run count and the executor + // step count. + int64 global_step = 10; } diff --git a/tensorflow/python/debug/BUILD b/tensorflow/python/debug/BUILD index 339a6a72e0d..4a83e33ae18 100644 --- a/tensorflow/python/debug/BUILD +++ b/tensorflow/python/debug/BUILD @@ -436,6 +436,7 @@ cuda_py_test( "//tensorflow/python:platform_test", "//tensorflow/python:variables", ], + tags = ["notsan"], ) py_test( diff --git a/tensorflow/python/debug/debug_data.py b/tensorflow/python/debug/debug_data.py index 110ce376073..9b1796eb490 100644 --- a/tensorflow/python/debug/debug_data.py +++ b/tensorflow/python/debug/debug_data.py @@ -19,6 +19,7 @@ from __future__ import division from __future__ import print_function import collections +import json import os import numpy as np @@ -31,6 +32,7 @@ from tensorflow.python.platform import gfile METADATA_FILE_PREFIX = "_tfdbg_" +CORE_METADATA_TAG = "core_metadata_" GRAPH_FILE_TAG = "graph_" FETCHES_INFO_FILE_TAG = "fetches_info_" FEED_KEYS_INFO_FILE_TAG = "feed_keys_info_" @@ -110,6 +112,10 @@ def parse_node_or_tensor_name(name): return name, None +def _is_core_metadata_file(file_name): + return file_name.startswith(METADATA_FILE_PREFIX + CORE_METADATA_TAG) + + def _is_graph_file(file_name): return file_name.startswith(METADATA_FILE_PREFIX + GRAPH_FILE_TAG) @@ -274,6 +280,20 @@ def has_inf_or_nan(datum, tensor): return False +def extract_core_metadata_from_event_proto(event): + json_metadata = json.loads(event.log_message.message) + core_metadata = collections.namedtuple("CoreMetadata", [ + "global_step", "session_run_count", "executor_step_count", "input_names", + "output_names", "target_nodes" + ]) + return core_metadata(json_metadata["global_step"], + json_metadata["session_run_count"], + json_metadata["executor_step_count"], + json_metadata["input_names"], + json_metadata["output_names"], + json_metadata["target_nodes"]) + + class DebugTensorDatum(object): """A single tensor dumped by TensorFlow Debugger (tfdbg). @@ -455,6 +475,7 @@ class DebugDumpDir(object): if not gfile.IsDirectory(dump_root): raise IOError("Dump root directory %s does not exist" % dump_root) + self._core_metadata = None self._load_dumps(dump_root) self._create_tensor_watch_maps() self._load_partition_graphs(partition_graphs, validate) @@ -498,6 +519,9 @@ class DebugDumpDir(object): for root, _, files in gfile.Walk(self._dump_root): for f in files: if f.startswith(METADATA_FILE_PREFIX): + if _is_core_metadata_file(f): + self._load_core_metadata(os.path.join(self._dump_root, root, f)) + if _is_graph_file(f): self._dump_graph_file_paths.append( os.path.join(self._dump_root, root, f)) @@ -526,6 +550,12 @@ class DebugDumpDir(object): else: self._t0 = None + def _load_core_metadata(self, event_file_path): + event = event_pb2.Event() + with gfile.Open(event_file_path, "rb") as f: + event.ParseFromString(f.read()) + self._core_metadata = extract_core_metadata_from_event_proto(event) + def _dump_file_name_to_datum(self, dir_name, file_name): """Obtain a DebugTensorDatum from the directory and file name. @@ -587,6 +617,37 @@ class DebugDumpDir(object): for op in self._python_graph.get_operations(): self._node_traceback[op.name] = op.traceback + @property + def core_metadata(self): + """Metadata about the `Session.run()` call from the core runtime. + + Of the three counters available in the return value, `global_step` is + supplied by the caller of the debugged `Session.run()`, while + `session_run_count` and `executor_step_count` are determined by the state + of the core runtime, automatically. For the same fetch list, feed keys and + debug tensor watch options, the same executor will be used and + `executor_step_count` should increase by one at a time. However, runs with + different fetch lists, feed keys and debug_tensor watch options that all + share the same `Session` object can lead to gaps in `session_run_count`. + + Returns: + If core metadata are loaded, a `namedtuple` with the fields: + `global_step`: A global step count supplied by the caller of + `Session.run()`. It is optional to the caller. If the caller did not + supply this parameter, its value will be -1. + `session_run_count`: A counter for Run() calls to the underlying + TensorFlow `Session` object. + `executor_step_count`: A counter for invocations of a given runtime + executor. The same executor is re-used for the same fetched tensors, + target nodes, input feed keys and debug tensor watch options. + `input_names`: Names of the input (feed) Tensors. + `output_names`: Names of the output (fetched) Tensors. + `target_nodes`: Names of the target nodes. + If the core metadata have not been loaded, `None`. + """ + + return self._core_metadata + @property def dumped_tensor_data(self): return self._dump_tensor_data diff --git a/tensorflow/python/debug/debug_utils.py b/tensorflow/python/debug/debug_utils.py index 3d6d5ad4476..2b8e95b99e0 100644 --- a/tensorflow/python/debug/debug_utils.py +++ b/tensorflow/python/debug/debug_utils.py @@ -27,7 +27,8 @@ def add_debug_tensor_watch(run_options, node_name, output_slot=0, debug_ops="DebugIdentity", - debug_urls=None): + debug_urls=None, + global_step=-1): """Add watch on a `Tensor` to `RunOptions`. N.B.: Under certain circumstances, the `Tensor` may not be actually watched @@ -42,9 +43,12 @@ def add_debug_tensor_watch(run_options, `list` of `str` with only one element. debug_urls: (`str` or `list` of `str`) URL(s) to send debug values to, e.g., `file:///tmp/tfdbg_dump_1`, `grpc://localhost:12345`. + global_step: (`int`) Optional global_step count for this debug tensor + watch. """ watch_opts = run_options.debug_options.debug_tensor_watch_opts + run_options.debug_options.global_step = global_step watch = watch_opts.add() watch.node_name = node_name @@ -67,7 +71,8 @@ def watch_graph(run_options, debug_ops="DebugIdentity", debug_urls=None, node_name_regex_whitelist=None, - op_type_regex_whitelist=None): + op_type_regex_whitelist=None, + global_step=-1): """Add debug watches to `RunOptions` for a TensorFlow graph. To watch all `Tensor`s on the graph, let both `node_name_regex_whitelist` @@ -93,6 +98,8 @@ def watch_graph(run_options, are set, the two filtering operations will occur in a logical `AND` relation. In other words, a node will be included if and only if it hits both whitelists. + global_step: (`int`) Optional global_step count for this debug tensor + watch. """ if isinstance(debug_ops, str): @@ -128,7 +135,8 @@ def watch_graph(run_options, node_name, output_slot=slot, debug_ops=debug_ops, - debug_urls=debug_urls) + debug_urls=debug_urls, + global_step=global_step) def watch_graph_with_blacklists(run_options, @@ -136,7 +144,8 @@ def watch_graph_with_blacklists(run_options, debug_ops="DebugIdentity", debug_urls=None, node_name_regex_blacklist=None, - op_type_regex_blacklist=None): + op_type_regex_blacklist=None, + global_step=-1): """Add debug tensor watches, blacklisting nodes and op types. This is similar to `watch_graph()`, but the node names and op types are @@ -161,6 +170,8 @@ def watch_graph_with_blacklists(run_options, relation. In other words, a node will be excluded if it hits either of the two blacklists; a node will be included if and only if it hits neither of the blacklists. + global_step: (`int`) Optional global_step count for this debug tensor + watch. """ if isinstance(debug_ops, str): @@ -196,4 +207,5 @@ def watch_graph_with_blacklists(run_options, node_name, output_slot=slot, debug_ops=debug_ops, - debug_urls=debug_urls) + debug_urls=debug_urls, + global_step=global_step) diff --git a/tensorflow/python/debug/session_debug_file_test.py b/tensorflow/python/debug/session_debug_file_test.py index 8acd3975b70..ca403e00177 100644 --- a/tensorflow/python/debug/session_debug_file_test.py +++ b/tensorflow/python/debug/session_debug_file_test.py @@ -18,6 +18,8 @@ from __future__ import division from __future__ import print_function import os +import shutil +import tempfile from tensorflow.core.protobuf import config_pb2 from tensorflow.python.client import session @@ -25,6 +27,7 @@ from tensorflow.python.debug import debug_data from tensorflow.python.debug import debug_utils from tensorflow.python.debug import session_debug_testlib from tensorflow.python.framework import constant_op +from tensorflow.python.framework import ops from tensorflow.python.ops import math_ops from tensorflow.python.ops import variables from tensorflow.python.platform import googletest @@ -108,5 +111,25 @@ class SessionDebugTest(session_debug_testlib.SessionDebugTestBase): dump.get_rel_timestamps("%s/read" % v_name, 0, "DebugIdentity")[0], 0) + +class SessionDebugConcurrentTest( + session_debug_testlib.DebugConcurrentRunCallsTest): + + def setUp(self): + self._num_concurrent_runs = 3 + self._dump_roots = [] + for _ in range(self._num_concurrent_runs): + self._dump_roots.append(tempfile.mkdtemp()) + + def tearDown(self): + ops.reset_default_graph() + for dump_root in self._dump_roots: + if os.path.isdir(dump_root): + shutil.rmtree(dump_root) + + def _get_concurrent_debug_urls(self): + return [("file://%s" % dump_root) for dump_root in self._dump_roots] + + if __name__ == "__main__": googletest.main() diff --git a/tensorflow/python/debug/session_debug_testlib.py b/tensorflow/python/debug/session_debug_testlib.py index ba453ef7e09..0306ace88ca 100644 --- a/tensorflow/python/debug/session_debug_testlib.py +++ b/tensorflow/python/debug/session_debug_testlib.py @@ -18,14 +18,18 @@ from __future__ import division from __future__ import print_function import collections +import functools +import glob import os import shutil import tempfile +import threading import numpy as np from six.moves import xrange # pylint: disable=redefined-builtin from tensorflow.core.protobuf import config_pb2 +from tensorflow.core.util import event_pb2 from tensorflow.python.client import session from tensorflow.python.debug import debug_data from tensorflow.python.debug import debug_utils @@ -134,6 +138,15 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): results = self._generate_dump_from_simple_addition_graph() self.assertTrue(results.dump.loaded_partition_graphs()) + # Since global_step is not explicitly specified, it should take its default + # value: -1. + self.assertEqual(-1, results.dump.core_metadata.global_step) + self.assertGreaterEqual(results.dump.core_metadata.session_run_count, 0) + self.assertGreaterEqual(results.dump.core_metadata.executor_step_count, 0) + self.assertEqual([], results.dump.core_metadata.input_names) + self.assertEqual([results.w.name], results.dump.core_metadata.output_names) + self.assertEqual([], results.dump.core_metadata.target_nodes) + # Verify the dumped tensor values for u and v. self.assertEqual(2, results.dump.size) @@ -869,6 +882,63 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertAllClose([0, 0, 1, 2, 2], unique_x_slot_1_dumps[0].get_tensor()) + def testSuccessiveDebuggingRunsIncreasesCounters(self): + """Test repeated Session.run() calls with debugger increments counters.""" + + with session.Session() as sess: + ph = array_ops.placeholder(dtypes.float32, name="successive/ph") + x = array_ops.transpose(ph, name="mismatch/x") + y = array_ops.squeeze(ph, name="mismatch/y") + + run_options = config_pb2.RunOptions(output_partition_graphs=True) + debug_utils.watch_graph( + run_options, sess.graph, debug_urls=self._debug_urls(), global_step=1) + + sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options) + dump1 = debug_data.DebugDumpDir(self._dump_root) + self.assertEqual(1, dump1.core_metadata.global_step) + self.assertGreaterEqual(dump1.core_metadata.session_run_count, 0) + self.assertEqual(0, dump1.core_metadata.executor_step_count) + self.assertEqual([ph.name], dump1.core_metadata.input_names) + self.assertEqual([x.name], dump1.core_metadata.output_names) + self.assertEqual([], dump1.core_metadata.target_nodes) + shutil.rmtree(self._dump_root) + + run_options = config_pb2.RunOptions(output_partition_graphs=True) + debug_utils.watch_graph( + run_options, sess.graph, debug_urls=self._debug_urls(), global_step=2) + + # Calling run() with the same feed, same output and same debug watch + # options should increment both session_run_count and + # executor_step_count. + sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options) + dump2 = debug_data.DebugDumpDir(self._dump_root) + self.assertEqual(2, dump2.core_metadata.global_step) + self.assertEqual(dump1.core_metadata.session_run_count + 1, + dump2.core_metadata.session_run_count) + self.assertEqual(dump1.core_metadata.executor_step_count + 1, + dump2.core_metadata.executor_step_count) + self.assertEqual([ph.name], dump2.core_metadata.input_names) + self.assertEqual([x.name], dump2.core_metadata.output_names) + self.assertEqual([], dump2.core_metadata.target_nodes) + shutil.rmtree(self._dump_root) + + run_options = config_pb2.RunOptions(output_partition_graphs=True) + debug_utils.watch_graph( + run_options, sess.graph, debug_urls=self._debug_urls(), global_step=3) + + # Calling run() with a different output should increment + # session_run_count, but not executor_step_count. + sess.run(y, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options) + dump3 = debug_data.DebugDumpDir(self._dump_root) + self.assertEqual(3, dump3.core_metadata.global_step) + self.assertEqual(dump2.core_metadata.session_run_count + 1, + dump3.core_metadata.session_run_count) + self.assertEqual(0, dump3.core_metadata.executor_step_count) + self.assertEqual([ph.name], dump3.core_metadata.input_names) + self.assertEqual([y.name], dump3.core_metadata.output_names) + self.assertEqual([], dump3.core_metadata.target_nodes) + def testDebuggingDuringOpError(self): """Test the debug tensor dumping when error occurs in graph runtime.""" @@ -894,6 +964,12 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): dump = debug_data.DebugDumpDir(self._dump_root) + self.assertGreaterEqual(dump.core_metadata.session_run_count, 0) + self.assertGreaterEqual(dump.core_metadata.executor_step_count, 0) + self.assertEqual([ph.name], dump.core_metadata.input_names) + self.assertEqual([y.name], dump.core_metadata.output_names) + self.assertEqual([], dump.core_metadata.target_nodes) + # Despite the fact that the run() call errored out and partition_graphs # are not available via run_metadata, the partition graphs should still # have been loaded from the dump directory. @@ -1045,5 +1121,96 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase): self.assertIsInstance(trace, tuple) +class DebugConcurrentRunCallsTest(test_util.TensorFlowTestCase): + """Test for debugging concurrent Session.run() calls.""" + + def _get_concurrent_debug_urls(self): + """Abstract method to generate debug URLs for concurrent debugged runs.""" + raise NotImplementedError( + "_get_concurrent_debug_urls is not implemented in the base test class") + + def testDebugConcurrentVariableUpdates(self): + if test.is_gpu_available(): + self.skipTest("No testing concurrent runs on a single GPU.") + + with session.Session() as sess: + v = variables.Variable(30.0, name="v") + constants = [] + for i in xrange(self._num_concurrent_runs): + constants.append(constant_op.constant(1.0, name="c%d" % i)) + incs = [ + state_ops.assign_add( + v, c, use_locking=True, name=("inc%d" % i)) + for (i, c) in enumerate(constants) + ] + sess.run(v.initializer) + + concurrent_debug_urls = self._get_concurrent_debug_urls() + + def inc_job(index): + run_options = config_pb2.RunOptions(output_partition_graphs=True) + debug_utils.watch_graph( + run_options, sess.graph, debug_urls=concurrent_debug_urls[index]) + for _ in xrange(100): + sess.run(incs[index], options=run_options) + + inc_threads = [] + for index in xrange(self._num_concurrent_runs): + inc_thread = threading.Thread(target=functools.partial(inc_job, index)) + inc_thread.start() + inc_threads.append(inc_thread) + for inc_thread in inc_threads: + inc_thread.join() + + self.assertAllClose(30.0 + 1.0 * self._num_concurrent_runs * 100, + sess.run(v)) + + all_session_run_counts = [] + for index in xrange(self._num_concurrent_runs): + dump = debug_data.DebugDumpDir(self._dump_roots[index]) + self.assertTrue(dump.loaded_partition_graphs()) + + v_data = dump.get_tensors("v", 0, "DebugIdentity") + self.assertEqual(100, len(v_data)) + + # Examine all the core metadata files + core_metadata_files = glob.glob( + os.path.join(self._dump_roots[index], "_tfdbg_core*")) + + timestamps = [] + session_run_counts = [] + executor_step_counts = [] + for core_metadata_file in core_metadata_files: + with open(core_metadata_file, "rb") as f: + event = event_pb2.Event() + event.ParseFromString(f.read()) + core_metadata = ( + debug_data.extract_core_metadata_from_event_proto(event)) + timestamps.append(event.wall_time) + session_run_counts.append(core_metadata.session_run_count) + executor_step_counts.append(core_metadata.executor_step_count) + + all_session_run_counts.extend(session_run_counts) + + # Assert that executor_step_count increases by one at a time. + executor_step_counts = zip(timestamps, executor_step_counts) + executor_step_counts = sorted(executor_step_counts, key=lambda x: x[0]) + for i in xrange(len(executor_step_counts) - 1): + self.assertEquals(executor_step_counts[i][1] + 1, + executor_step_counts[i + 1][1]) + + # Assert that session_run_count increase monotonically. + session_run_counts = zip(timestamps, session_run_counts) + session_run_counts = sorted(session_run_counts, key=lambda x: x[0]) + for i in xrange(len(session_run_counts) - 1): + self.assertGreater(session_run_counts[i + 1][1], + session_run_counts[i][1]) + + # Assert that the session_run_counts from the concurrent run() calls are + # all unique. + self.assertEqual(len(all_session_run_counts), + len(set(all_session_run_counts))) + + if __name__ == "__main__": googletest.main()