From aabc7972b94af5a678550427534d4fba7fda327c Mon Sep 17 00:00:00 2001
From: Shanqing Cai <cais@google.com>
Date: Wed, 8 Feb 2017 04:32:26 -0800
Subject: [PATCH] tfdbg core: add core metadata to debugger data stream +
 better support of concurrent debugged runs

* Let the debugger send/dump an Event proto holding a JSON string in its log_message.message field. The JSON metadata includes,
1) An optional, client-specified global_step field that defaults to -1 if not supplied
2) A session run count
3) An executor invocation step count
4) Input names (feed keys)
5) Output names (fetched Tensor names)
6) Target node names

* grpc_debug_server.EventListenerBaseServicer now requires a constructor of the type EventListenerBaseStreamHandler and will construct a new handler object from it, for every stream. This leads to better support of concurrent debugged Session::Run() calls.

* Add support for path names in grpc:// URLs, such as "grpc://localhost:6000/thread1". Different path names will lead to separate gRPC streams being opened to the same server:port, supporting concurrent debugged Session::Run() calls.
Change: 146896481
---
 .../common_runtime/debugger_state_interface.h |  18 ++
 .../core/common_runtime/direct_session.cc     |  16 +-
 .../core/common_runtime/direct_session.h      |   4 +-
 tensorflow/core/debug/debug_graph_utils.cc    |  10 ++
 tensorflow/core/debug/debug_graph_utils.h     |  11 ++
 .../core/debug/debug_grpc_io_utils_test.cc    |   8 +-
 tensorflow/core/debug/debug_io_utils.cc       | 126 ++++++++++---
 tensorflow/core/debug/debug_io_utils.h        |  16 +-
 tensorflow/core/protobuf/debug.proto          |  12 ++
 tensorflow/python/debug/BUILD                 |   1 +
 tensorflow/python/debug/debug_data.py         |  61 +++++++
 tensorflow/python/debug/debug_utils.py        |  22 ++-
 .../python/debug/session_debug_file_test.py   |  23 +++
 .../python/debug/session_debug_testlib.py     | 167 ++++++++++++++++++
 14 files changed, 452 insertions(+), 43 deletions(-)

diff --git a/tensorflow/core/common_runtime/debugger_state_interface.h b/tensorflow/core/common_runtime/debugger_state_interface.h
index a4ff6b74fc5..fb72f9fa3ea 100644
--- a/tensorflow/core/common_runtime/debugger_state_interface.h
+++ b/tensorflow/core/common_runtime/debugger_state_interface.h
@@ -39,6 +39,24 @@ class DebuggerStateInterface {
   // record. See the documentation of DebugNodeInserter::InsertNodes() for
   // details.
   virtual Status DecorateGraphForDebug(Graph* graph, Device* device) = 0;
+
+  // Publish metadata about the debugged Session::Run() call.
+  //
+  // Args:
+  //   global_step: A global step count supplied by the caller of
+  //     Session::Run().
+  //   session_run_count: A counter for calls to the Run() method of the
+  //     Session object.
+  //   executor_step_count: A counter for invocations of the executor charged
+  //     to serve this Session::Run() call.
+  //   input_names: Name of the input Tensors (feed keys).
+  //   output_names: Names of the fetched Tensors.
+  //   target_names: Names of the target nodes.
+  virtual Status PublishDebugMetadata(
+      const int64 global_step, const int64 session_run_count,
+      const int64 executor_step_count, const std::vector<string>& input_names,
+      const std::vector<string>& output_names,
+      const std::vector<string>& target_nodes) = 0;
 };
 
 typedef std::function<std::unique_ptr<DebuggerStateInterface>(
diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc
index 953c4180fd7..f00f5ffd8f7 100644
--- a/tensorflow/core/common_runtime/direct_session.cc
+++ b/tensorflow/core/common_runtime/direct_session.cc
@@ -397,6 +397,9 @@ Status DirectSession::Run(const RunOptions& run_options,
   ExecutorsAndKeys* executors_and_keys;
   RunStateArgs run_state_args;
 
+  Executor::Args args;
+  args.step_id = step_id_counter_.fetch_add(1);
+
   // EXPERIMENTAL: Options that allow the client to insert nodes into partition
   // graphs for debugging.
   if (!run_options.debug_options().debug_tensor_watch_opts().empty()) {
@@ -407,10 +410,15 @@ Status DirectSession::Run(const RunOptions& run_options,
   TF_RETURN_IF_ERROR(
       GetOrCreateExecutors(pool, input_tensor_names, output_names, target_nodes,
                            &executors_and_keys, &run_state_args));
+  const int64 executor_step_count = executors_and_keys->step_count.fetch_add(1);
+
+  if (run_state_args.debugger_state) {
+    TF_RETURN_IF_ERROR(run_state_args.debugger_state->PublishDebugMetadata(
+        run_options.debug_options().global_step(), args.step_id,
+        executor_step_count, input_tensor_names, output_names, target_nodes));
+  }
 
   // Create a run state and start execution.
-  Executor::Args args;
-  args.step_id = step_id_counter_.fetch_add(1);
   RunState run_state(args.step_id, &devices_);
   run_state.rendez = new IntraProcessRendezvous(device_mgr_.get());
   CancellationManager step_cancellation_manager;
@@ -450,8 +458,7 @@ Status DirectSession::Run(const RunOptions& run_options,
         options_.config.graph_options().build_cost_model();
     const int64 build_cost_model_after =
         options_.config.graph_options().build_cost_model_after();
-    int measure_step_count =
-        executors_and_keys->step_count - build_cost_model_after;
+    int measure_step_count = executor_step_count - build_cost_model_after;
     if (measure_step_count >= 0) {
       update_cost_model =
           ((measure_step_count + 1) % build_cost_model_every == 0);
@@ -527,7 +534,6 @@ Status DirectSession::Run(const RunOptions& run_options,
 
   // Build and return the cost model as instructed.
   mutex_lock l(executor_lock_);
-  ++executors_and_keys->step_count;
   if (update_cost_model) {
     // Build the cost model
     std::unordered_map<string, const Graph*> device_to_graph;
diff --git a/tensorflow/core/common_runtime/direct_session.h b/tensorflow/core/common_runtime/direct_session.h
index ff4786f998d..3e3a5eaa8f4 100644
--- a/tensorflow/core/common_runtime/direct_session.h
+++ b/tensorflow/core/common_runtime/direct_session.h
@@ -125,7 +125,9 @@ class DirectSession : public Session {
   // library. Consider giving each partition its own function library to enable
   // per-partition rewrites.
   struct ExecutorsAndKeys {
-    int64 step_count = 0;
+    ExecutorsAndKeys() : step_count(0) {}
+
+    std::atomic_int_fast64_t step_count;
     std::unique_ptr<Graph> graph;
     NameNodeMap name_to_node;
     std::unique_ptr<FunctionLibraryDefinition> flib_def;
diff --git a/tensorflow/core/debug/debug_graph_utils.cc b/tensorflow/core/debug/debug_graph_utils.cc
index 06c129ecc84..217022759bc 100644
--- a/tensorflow/core/debug/debug_graph_utils.cc
+++ b/tensorflow/core/debug/debug_graph_utils.cc
@@ -75,6 +75,16 @@ Status DebuggerState::DecorateGraphForDebug(Graph* graph, Device* device) {
   return status;
 }
 
+Status DebuggerState::PublishDebugMetadata(
+    const int64 global_step, const int64 session_run_count,
+    const int64 executor_step_count, const std::vector<string>& input_names,
+    const std::vector<string>& output_names,
+    const std::vector<string>& target_nodes) {
+  return DebugIO::PublishDebugMetadata(global_step, session_run_count,
+                                       executor_step_count, input_names,
+                                       output_names, target_nodes, debug_urls_);
+}
+
 // static
 Status DebugNodeInserter::InsertNodes(
     const protobuf::RepeatedPtrField<DebugTensorWatch>& watches, Graph* graph,
diff --git a/tensorflow/core/debug/debug_graph_utils.h b/tensorflow/core/debug/debug_graph_utils.h
index cda2c31f0ce..6edd26c2602 100644
--- a/tensorflow/core/debug/debug_graph_utils.h
+++ b/tensorflow/core/debug/debug_graph_utils.h
@@ -43,6 +43,17 @@ class DebuggerState : public DebuggerStateInterface {
 
   const protobuf::RepeatedPtrField<DebugTensorWatch>& watches;
 
+  // Publish metadata about the debugged Session::Run() call.
+  //
+  // See the doc string of DebuggerStateInterface::PublishDebugMetadata() for
+  // details.
+  Status PublishDebugMetadata(const int64 global_step,
+                              const int64 session_run_count,
+                              const int64 executor_step_count,
+                              const std::vector<string>& input_names,
+                              const std::vector<string>& output_names,
+                              const std::vector<string>& target_names);
+
  private:
   std::unordered_set<string> debug_urls_;
 };
diff --git a/tensorflow/core/debug/debug_grpc_io_utils_test.cc b/tensorflow/core/debug/debug_grpc_io_utils_test.cc
index acf1224be19..bb79528cc19 100644
--- a/tensorflow/core/debug/debug_grpc_io_utils_test.cc
+++ b/tensorflow/core/debug/debug_grpc_io_utils_test.cc
@@ -80,10 +80,10 @@ TEST_F(GrpcDebugTest, AttemptToSendToNonexistentGrpcAddress) {
       "foo_tensor", "DebugIdentity", tensor, Env::Default()->NowMicros(),
       {kInvalidGrpcUrl});
   ASSERT_FALSE(publish_status.ok());
-  ASSERT_NE(
-      string::npos,
-      publish_status.error_message().find(
-          "Channel at the following gRPC address is not ready: 0.0.0.0:0"));
+  ASSERT_NE(string::npos,
+            publish_status.error_message().find(
+                "Channel at the following gRPC stream URL is not ready: "
+                "grpc://0.0.0.0:0"));
 
   DebugIO::CloseDebugURL(kInvalidGrpcUrl);
 }
diff --git a/tensorflow/core/debug/debug_io_utils.cc b/tensorflow/core/debug/debug_io_utils.cc
index 78196007650..f4136dcabaa 100644
--- a/tensorflow/core/debug/debug_io_utils.cc
+++ b/tensorflow/core/debug/debug_io_utils.cc
@@ -98,6 +98,83 @@ const char* const DebugIO::kFileURLScheme = "file://";
 // static
 const char* const DebugIO::kGrpcURLScheme = "grpc://";
 
+// static
+Status DebugIO::PublishDebugMetadata(
+    const int64 global_step, const int64 session_run_count,
+    const int64 executor_step_count, const std::vector<string>& input_names,
+    const std::vector<string>& output_names,
+    const std::vector<string>& target_nodes,
+    const std::unordered_set<string>& debug_urls) {
+  std::ostringstream oss;
+
+  // Construct a JSON string to carry the metadata.
+  oss << "{";
+  oss << "\"global_step\":" << global_step << ",";
+  oss << "\"session_run_count\":" << session_run_count << ",";
+  oss << "\"executor_step_count\":" << executor_step_count << ",";
+  oss << "\"input_names\":[";
+  for (size_t i = 0; i < input_names.size(); ++i) {
+    oss << "\"" << input_names[i] << "\"";
+    if (i < input_names.size() - 1) {
+      oss << ",";
+    }
+  }
+  oss << "],";
+  oss << "\"output_names\":[";
+  for (size_t i = 0; i < output_names.size(); ++i) {
+    oss << "\"" << output_names[i] << "\"";
+    if (i < output_names.size() - 1) {
+      oss << ",";
+    }
+  }
+  oss << "],";
+  oss << "\"target_nodes\":[";
+  for (size_t i = 0; i < target_nodes.size(); ++i) {
+    oss << "\"" << target_nodes[i] << "\"";
+    if (i < target_nodes.size() - 1) {
+      oss << ",";
+    }
+  }
+  oss << "]";
+  oss << "}";
+
+  const string json_metadata = oss.str();
+  Event event;
+  event.set_wall_time(static_cast<double>(Env::Default()->NowMicros()));
+  LogMessage* log_message = event.mutable_log_message();
+  log_message->set_message(json_metadata);
+
+  Status status;
+  for (const string& url : debug_urls) {
+    if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
+      Event grpc_event;
+
+      // Determine the path (if any) in the grpc:// URL, and add it as a field
+      // of the JSON string.
+      const string address = url.substr(strlen(DebugIO::kFileURLScheme));
+      const string path = address.find("/") == string::npos
+                              ? ""
+                              : address.substr(address.find("/"));
+      grpc_event.set_wall_time(event.wall_time());
+      LogMessage* log_message_grpc = grpc_event.mutable_log_message();
+      log_message_grpc->set_message(
+          strings::StrCat(json_metadata.substr(0, json_metadata.size() - 1),
+                          ",\"grpc_path\":\"", path, "\"}"));
+
+      status.Update(
+          DebugGrpcIO::SendEventProtoThroughGrpcStream(grpc_event, url));
+    } else if (str_util::Lowercase(url).find(kFileURLScheme) == 0) {
+      const string dump_root_dir = url.substr(strlen(kFileURLScheme));
+      const string file_name =
+          strings::StrCat("_tfdbg_core_metadata_", Env::Default()->NowMicros());
+      status.Update(
+          DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
+    }
+  }
+
+  return status;
+}
+
 // static
 Status DebugIO::PublishDebugTensor(const string& tensor_name,
                                    const string& debug_op, const Tensor& tensor,
@@ -136,10 +213,8 @@ Status DebugIO::PublishDebugTensor(const string& tensor_name,
         fail_statuses.push_back(s);
       }
     } else if (str_util::Lowercase(url).find(kGrpcURLScheme) == 0) {
-      const string grpc_server_stream_addr = url.substr(strlen(kGrpcURLScheme));
       Status s = DebugGrpcIO::SendTensorThroughGrpcStream(
-          node_name, output_slot, debug_op, tensor, wall_time_us,
-          grpc_server_stream_addr);
+          node_name, output_slot, debug_op, tensor, wall_time_us, url);
 
       if (!s.ok()) {
         num_failed_urls++;
@@ -189,8 +264,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
       status.Update(
           DebugFileIO::DumpEventProtoToFile(event, dump_root_dir, file_name));
     } else if (debug_url.find(kGrpcURLScheme) == 0) {
-      DebugGrpcIO::SendEventProtoThroughGrpcStream(
-          event, debug_url.substr(strlen(kGrpcURLScheme)));
+      DebugGrpcIO::SendEventProtoThroughGrpcStream(event, debug_url);
     }
   }
 
@@ -200,8 +274,7 @@ Status DebugIO::PublishGraph(const Graph& graph,
 // static
 Status DebugIO::CloseDebugURL(const string& debug_url) {
   if (debug_url.find(DebugIO::kGrpcURLScheme) == 0) {
-    return DebugGrpcIO::CloseGrpcStream(
-        debug_url.substr(strlen(DebugIO::kGrpcURLScheme)));
+    return DebugGrpcIO::CloseGrpcStream(debug_url);
   } else {
     // No-op for non-gRPC URLs.
     return Status::OK();
@@ -348,57 +421,64 @@ std::unordered_map<string, std::shared_ptr<DebugGrpcChannel>>
     DebugGrpcIO::stream_channels;
 
 // static
-Status DebugGrpcIO::SendTensorThroughGrpcStream(
-    const string& node_name, const int32 output_slot, const string& debug_op,
-    const Tensor& tensor, const uint64 wall_time_us,
-    const string& server_stream_addr) {
+Status DebugGrpcIO::SendTensorThroughGrpcStream(const string& node_name,
+                                                const int32 output_slot,
+                                                const string& debug_op,
+                                                const Tensor& tensor,
+                                                const uint64 wall_time_us,
+                                                const string& grpc_stream_url) {
   const string tensor_name = strings::StrCat(node_name, ":", output_slot);
 
   // Prepare tensor Event data to be sent.
   Event event = WrapTensorAsEvent(tensor_name, debug_op, tensor, wall_time_us);
 
-  return SendEventProtoThroughGrpcStream(event, server_stream_addr);
+  return SendEventProtoThroughGrpcStream(event, grpc_stream_url);
 }
 
 // static
 Status DebugGrpcIO::SendEventProtoThroughGrpcStream(
-    const Event& event_proto, const string& server_stream_addr) {
+    const Event& event_proto, const string& grpc_stream_url) {
+  const string addr_with_path =
+      grpc_stream_url.substr(strlen(DebugIO::kFileURLScheme));
+  const string server_stream_addr =
+      addr_with_path.substr(0, addr_with_path.find('/'));
+
   std::shared_ptr<DebugGrpcChannel> debug_grpc_channel;
   {
     mutex_lock l(streams_mu);
-    if (stream_channels.find(server_stream_addr) == stream_channels.end()) {
+    if (stream_channels.find(grpc_stream_url) == stream_channels.end()) {
       debug_grpc_channel.reset(new DebugGrpcChannel(server_stream_addr));
 
       if (!debug_grpc_channel->is_channel_ready()) {
         return errors::FailedPrecondition(
-            strings::StrCat("Channel at the following gRPC address is ",
-                            "not ready: ", server_stream_addr));
+            strings::StrCat("Channel at the following gRPC stream URL is ",
+                            "not ready: ", grpc_stream_url));
       }
 
-      stream_channels[server_stream_addr] = debug_grpc_channel;
+      stream_channels[grpc_stream_url] = debug_grpc_channel;
     } else {
-      debug_grpc_channel = stream_channels[server_stream_addr];
+      debug_grpc_channel = stream_channels[grpc_stream_url];
     }
   }
 
   bool write_ok = debug_grpc_channel->WriteEvent(event_proto);
   if (!write_ok) {
     return errors::Cancelled(strings::StrCat("Write event to stream URL ",
-                                             server_stream_addr, "failed."));
+                                             grpc_stream_url, "failed."));
   }
 
   return Status::OK();
 }
 
-Status DebugGrpcIO::CloseGrpcStream(const string& server_stream_addr) {
+Status DebugGrpcIO::CloseGrpcStream(const string& grpc_stream_url) {
   mutex_lock l(streams_mu);
 
-  if (stream_channels.find(server_stream_addr) != stream_channels.end()) {
+  if (stream_channels.find(grpc_stream_url) != stream_channels.end()) {
     // Stream of the specified address exists. Close it and remove it from
     // record.
     Status s;
-    s = stream_channels[server_stream_addr]->Close();
-    stream_channels.erase(server_stream_addr);
+    s = stream_channels[grpc_stream_url]->Close();
+    stream_channels.erase(grpc_stream_url);
     return s;
   } else {
     // Stream of the specified address does not exist. No action.
diff --git a/tensorflow/core/debug/debug_io_utils.h b/tensorflow/core/debug/debug_io_utils.h
index a10e24386ad..a12bea6dbcf 100644
--- a/tensorflow/core/debug/debug_io_utils.h
+++ b/tensorflow/core/debug/debug_io_utils.h
@@ -32,6 +32,13 @@ Status ReadEventFromFile(const string& dump_file_path, Event* event);
 
 class DebugIO {
  public:
+  static Status PublishDebugMetadata(
+      const int64 global_step, const int64 session_run_count,
+      const int64 executor_step_count, const std::vector<string>& input_names,
+      const std::vector<string>& output_names,
+      const std::vector<string>& target_nodes,
+      const std::unordered_set<string>& debug_urls);
+
   // Publish a tensor to a debug target URL.
   //
   // Args:
@@ -59,7 +66,6 @@ class DebugIO {
 
   static Status CloseDebugURL(const string& debug_url);
 
- private:
   static const char* const kFileURLScheme;
   static const char* const kGrpcURLScheme;
 };
@@ -172,18 +178,18 @@ class DebugGrpcIO {
                                             const string& debug_op,
                                             const Tensor& tensor,
                                             const uint64 wall_time_us,
-                                            const string& server_stream_addr);
+                                            const string& grpc_stream_url);
 
   // Send an Event proto through a debug gRPC stream.
   // Thread-safety: Safe with respect to other calls to the same method and
   // calls to CloseGrpcStream().
-  static Status SendEventProtoThroughGrpcStream(
-      const Event& event_proto, const string& server_stream_addr);
+  static Status SendEventProtoThroughGrpcStream(const Event& event_proto,
+                                                const string& grpc_stream_url);
 
   // Close a gRPC stream to the given address, if it exists.
   // Thread-safety: Safe with respect to other calls to the same method and
   // calls to SendTensorThroughGrpcStream().
-  static Status CloseGrpcStream(const string& server_stream_addr);
+  static Status CloseGrpcStream(const string& grpc_stream_url);
 
  private:
   static mutex streams_mu;
diff --git a/tensorflow/core/protobuf/debug.proto b/tensorflow/core/protobuf/debug.proto
index 5b32f9fc0b6..f35d7569be8 100644
--- a/tensorflow/core/protobuf/debug.proto
+++ b/tensorflow/core/protobuf/debug.proto
@@ -27,6 +27,13 @@ message DebugTensorWatch {
   //   E.g., "file:///foo/tfdbg_dump", "grpc://localhost:11011"
   // Each debug op listed in debug_ops will publish its output tensor (debug
   // signal) to all URLs in debug_urls.
+  //
+  // N.B. Session::Run() supports concurrent invocations of the same inputs
+  // (feed keys), outputs and target nodes. If such concurrent invocations
+  // are to be debugged, the callers of Session::Run() must use distinct
+  // debug_urls to make sure that the streamed or dumped events do not overlap
+  // among the invocations.
+  // TODO(cais): More visible documentation of this in g3docs.
   repeated string debug_urls = 4;
 }
 
@@ -34,4 +41,9 @@ message DebugTensorWatch {
 message DebugOptions {
   // Debugging options
   repeated DebugTensorWatch debug_tensor_watch_opts = 4;
+
+  // Caller-specified global step count.
+  // Note that this is distinct from the session run count and the executor
+  // step count.
+  int64 global_step = 10;
 }
diff --git a/tensorflow/python/debug/BUILD b/tensorflow/python/debug/BUILD
index 339a6a72e0d..4a83e33ae18 100644
--- a/tensorflow/python/debug/BUILD
+++ b/tensorflow/python/debug/BUILD
@@ -436,6 +436,7 @@ cuda_py_test(
         "//tensorflow/python:platform_test",
         "//tensorflow/python:variables",
     ],
+    tags = ["notsan"],
 )
 
 py_test(
diff --git a/tensorflow/python/debug/debug_data.py b/tensorflow/python/debug/debug_data.py
index 110ce376073..9b1796eb490 100644
--- a/tensorflow/python/debug/debug_data.py
+++ b/tensorflow/python/debug/debug_data.py
@@ -19,6 +19,7 @@ from __future__ import division
 from __future__ import print_function
 
 import collections
+import json
 import os
 
 import numpy as np
@@ -31,6 +32,7 @@ from tensorflow.python.platform import gfile
 
 
 METADATA_FILE_PREFIX = "_tfdbg_"
+CORE_METADATA_TAG = "core_metadata_"
 GRAPH_FILE_TAG = "graph_"
 FETCHES_INFO_FILE_TAG = "fetches_info_"
 FEED_KEYS_INFO_FILE_TAG = "feed_keys_info_"
@@ -110,6 +112,10 @@ def parse_node_or_tensor_name(name):
     return name, None
 
 
+def _is_core_metadata_file(file_name):
+  return file_name.startswith(METADATA_FILE_PREFIX + CORE_METADATA_TAG)
+
+
 def _is_graph_file(file_name):
   return file_name.startswith(METADATA_FILE_PREFIX + GRAPH_FILE_TAG)
 
@@ -274,6 +280,20 @@ def has_inf_or_nan(datum, tensor):
     return False
 
 
+def extract_core_metadata_from_event_proto(event):
+  json_metadata = json.loads(event.log_message.message)
+  core_metadata = collections.namedtuple("CoreMetadata", [
+      "global_step", "session_run_count", "executor_step_count", "input_names",
+      "output_names", "target_nodes"
+  ])
+  return core_metadata(json_metadata["global_step"],
+                       json_metadata["session_run_count"],
+                       json_metadata["executor_step_count"],
+                       json_metadata["input_names"],
+                       json_metadata["output_names"],
+                       json_metadata["target_nodes"])
+
+
 class DebugTensorDatum(object):
   """A single tensor dumped by TensorFlow Debugger (tfdbg).
 
@@ -455,6 +475,7 @@ class DebugDumpDir(object):
     if not gfile.IsDirectory(dump_root):
       raise IOError("Dump root directory %s does not exist" % dump_root)
 
+    self._core_metadata = None
     self._load_dumps(dump_root)
     self._create_tensor_watch_maps()
     self._load_partition_graphs(partition_graphs, validate)
@@ -498,6 +519,9 @@ class DebugDumpDir(object):
     for root, _, files in gfile.Walk(self._dump_root):
       for f in files:
         if f.startswith(METADATA_FILE_PREFIX):
+          if _is_core_metadata_file(f):
+            self._load_core_metadata(os.path.join(self._dump_root, root, f))
+
           if _is_graph_file(f):
             self._dump_graph_file_paths.append(
                 os.path.join(self._dump_root, root, f))
@@ -526,6 +550,12 @@ class DebugDumpDir(object):
     else:
       self._t0 = None
 
+  def _load_core_metadata(self, event_file_path):
+    event = event_pb2.Event()
+    with gfile.Open(event_file_path, "rb") as f:
+      event.ParseFromString(f.read())
+      self._core_metadata = extract_core_metadata_from_event_proto(event)
+
   def _dump_file_name_to_datum(self, dir_name, file_name):
     """Obtain a DebugTensorDatum from the directory and file name.
 
@@ -587,6 +617,37 @@ class DebugDumpDir(object):
       for op in self._python_graph.get_operations():
         self._node_traceback[op.name] = op.traceback
 
+  @property
+  def core_metadata(self):
+    """Metadata about the `Session.run()` call from the core runtime.
+
+    Of the three counters available in the return value, `global_step` is
+    supplied by the caller of the debugged `Session.run()`, while
+    `session_run_count` and `executor_step_count` are determined by the state
+    of the core runtime, automatically. For the same fetch list, feed keys and
+    debug tensor watch options, the same executor will be used and
+    `executor_step_count` should increase by one at a time. However, runs with
+    different fetch lists, feed keys and debug_tensor watch options that all
+    share the same `Session` object can lead to gaps in `session_run_count`.
+
+    Returns:
+      If core metadata are loaded, a `namedtuple` with the fields:
+        `global_step`: A global step count supplied by the caller of
+          `Session.run()`. It is optional to the caller. If the caller did not
+          supply this parameter, its value will be -1.
+        `session_run_count`: A counter for Run() calls to the underlying
+          TensorFlow `Session` object.
+        `executor_step_count`: A counter for invocations of a given runtime
+          executor. The same executor is re-used for the same fetched tensors,
+          target nodes, input feed keys and debug tensor watch options.
+        `input_names`: Names of the input (feed) Tensors.
+        `output_names`: Names of the output (fetched) Tensors.
+        `target_nodes`: Names of the target nodes.
+      If the core metadata have not been loaded, `None`.
+    """
+
+    return self._core_metadata
+
   @property
   def dumped_tensor_data(self):
     return self._dump_tensor_data
diff --git a/tensorflow/python/debug/debug_utils.py b/tensorflow/python/debug/debug_utils.py
index 3d6d5ad4476..2b8e95b99e0 100644
--- a/tensorflow/python/debug/debug_utils.py
+++ b/tensorflow/python/debug/debug_utils.py
@@ -27,7 +27,8 @@ def add_debug_tensor_watch(run_options,
                            node_name,
                            output_slot=0,
                            debug_ops="DebugIdentity",
-                           debug_urls=None):
+                           debug_urls=None,
+                           global_step=-1):
   """Add watch on a `Tensor` to `RunOptions`.
 
   N.B.: Under certain circumstances, the `Tensor` may not be actually watched
@@ -42,9 +43,12 @@ def add_debug_tensor_watch(run_options,
       `list` of `str` with only one element.
     debug_urls: (`str` or `list` of `str`) URL(s) to send debug values to,
       e.g., `file:///tmp/tfdbg_dump_1`, `grpc://localhost:12345`.
+    global_step: (`int`) Optional global_step count for this debug tensor
+      watch.
   """
 
   watch_opts = run_options.debug_options.debug_tensor_watch_opts
+  run_options.debug_options.global_step = global_step
 
   watch = watch_opts.add()
   watch.node_name = node_name
@@ -67,7 +71,8 @@ def watch_graph(run_options,
                 debug_ops="DebugIdentity",
                 debug_urls=None,
                 node_name_regex_whitelist=None,
-                op_type_regex_whitelist=None):
+                op_type_regex_whitelist=None,
+                global_step=-1):
   """Add debug watches to `RunOptions` for a TensorFlow graph.
 
   To watch all `Tensor`s on the graph, let both `node_name_regex_whitelist`
@@ -93,6 +98,8 @@ def watch_graph(run_options,
       are set, the two filtering operations will occur in a logical `AND`
       relation. In other words, a node will be included if and only if it
       hits both whitelists.
+    global_step: (`int`) Optional global_step count for this debug tensor
+      watch.
   """
 
   if isinstance(debug_ops, str):
@@ -128,7 +135,8 @@ def watch_graph(run_options,
           node_name,
           output_slot=slot,
           debug_ops=debug_ops,
-          debug_urls=debug_urls)
+          debug_urls=debug_urls,
+          global_step=global_step)
 
 
 def watch_graph_with_blacklists(run_options,
@@ -136,7 +144,8 @@ def watch_graph_with_blacklists(run_options,
                                 debug_ops="DebugIdentity",
                                 debug_urls=None,
                                 node_name_regex_blacklist=None,
-                                op_type_regex_blacklist=None):
+                                op_type_regex_blacklist=None,
+                                global_step=-1):
   """Add debug tensor watches, blacklisting nodes and op types.
 
   This is similar to `watch_graph()`, but the node names and op types are
@@ -161,6 +170,8 @@ def watch_graph_with_blacklists(run_options,
       relation. In other words, a node will be excluded if it hits either of
       the two blacklists; a node will be included if and only if it hits
       neither of the blacklists.
+    global_step: (`int`) Optional global_step count for this debug tensor
+      watch.
   """
 
   if isinstance(debug_ops, str):
@@ -196,4 +207,5 @@ def watch_graph_with_blacklists(run_options,
           node_name,
           output_slot=slot,
           debug_ops=debug_ops,
-          debug_urls=debug_urls)
+          debug_urls=debug_urls,
+          global_step=global_step)
diff --git a/tensorflow/python/debug/session_debug_file_test.py b/tensorflow/python/debug/session_debug_file_test.py
index 8acd3975b70..ca403e00177 100644
--- a/tensorflow/python/debug/session_debug_file_test.py
+++ b/tensorflow/python/debug/session_debug_file_test.py
@@ -18,6 +18,8 @@ from __future__ import division
 from __future__ import print_function
 
 import os
+import shutil
+import tempfile
 
 from tensorflow.core.protobuf import config_pb2
 from tensorflow.python.client import session
@@ -25,6 +27,7 @@ from tensorflow.python.debug import debug_data
 from tensorflow.python.debug import debug_utils
 from tensorflow.python.debug import session_debug_testlib
 from tensorflow.python.framework import constant_op
+from tensorflow.python.framework import ops
 from tensorflow.python.ops import math_ops
 from tensorflow.python.ops import variables
 from tensorflow.python.platform import googletest
@@ -108,5 +111,25 @@ class SessionDebugTest(session_debug_testlib.SessionDebugTestBase):
               dump.get_rel_timestamps("%s/read" % v_name, 0,
                                       "DebugIdentity")[0], 0)
 
+
+class SessionDebugConcurrentTest(
+    session_debug_testlib.DebugConcurrentRunCallsTest):
+
+  def setUp(self):
+    self._num_concurrent_runs = 3
+    self._dump_roots = []
+    for _ in range(self._num_concurrent_runs):
+      self._dump_roots.append(tempfile.mkdtemp())
+
+  def tearDown(self):
+    ops.reset_default_graph()
+    for dump_root in self._dump_roots:
+      if os.path.isdir(dump_root):
+        shutil.rmtree(dump_root)
+
+  def _get_concurrent_debug_urls(self):
+    return [("file://%s" % dump_root) for dump_root in self._dump_roots]
+
+
 if __name__ == "__main__":
   googletest.main()
diff --git a/tensorflow/python/debug/session_debug_testlib.py b/tensorflow/python/debug/session_debug_testlib.py
index ba453ef7e09..0306ace88ca 100644
--- a/tensorflow/python/debug/session_debug_testlib.py
+++ b/tensorflow/python/debug/session_debug_testlib.py
@@ -18,14 +18,18 @@ from __future__ import division
 from __future__ import print_function
 
 import collections
+import functools
+import glob
 import os
 import shutil
 import tempfile
+import threading
 
 import numpy as np
 from six.moves import xrange  # pylint: disable=redefined-builtin
 
 from tensorflow.core.protobuf import config_pb2
+from tensorflow.core.util import event_pb2
 from tensorflow.python.client import session
 from tensorflow.python.debug import debug_data
 from tensorflow.python.debug import debug_utils
@@ -134,6 +138,15 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
     results = self._generate_dump_from_simple_addition_graph()
     self.assertTrue(results.dump.loaded_partition_graphs())
 
+    # Since global_step is not explicitly specified, it should take its default
+    # value: -1.
+    self.assertEqual(-1, results.dump.core_metadata.global_step)
+    self.assertGreaterEqual(results.dump.core_metadata.session_run_count, 0)
+    self.assertGreaterEqual(results.dump.core_metadata.executor_step_count, 0)
+    self.assertEqual([], results.dump.core_metadata.input_names)
+    self.assertEqual([results.w.name], results.dump.core_metadata.output_names)
+    self.assertEqual([], results.dump.core_metadata.target_nodes)
+
     # Verify the dumped tensor values for u and v.
     self.assertEqual(2, results.dump.size)
 
@@ -869,6 +882,63 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
       self.assertAllClose([0, 0, 1, 2, 2],
                           unique_x_slot_1_dumps[0].get_tensor())
 
+  def testSuccessiveDebuggingRunsIncreasesCounters(self):
+    """Test repeated Session.run() calls with debugger increments counters."""
+
+    with session.Session() as sess:
+      ph = array_ops.placeholder(dtypes.float32, name="successive/ph")
+      x = array_ops.transpose(ph, name="mismatch/x")
+      y = array_ops.squeeze(ph, name="mismatch/y")
+
+      run_options = config_pb2.RunOptions(output_partition_graphs=True)
+      debug_utils.watch_graph(
+          run_options, sess.graph, debug_urls=self._debug_urls(), global_step=1)
+
+      sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
+      dump1 = debug_data.DebugDumpDir(self._dump_root)
+      self.assertEqual(1, dump1.core_metadata.global_step)
+      self.assertGreaterEqual(dump1.core_metadata.session_run_count, 0)
+      self.assertEqual(0, dump1.core_metadata.executor_step_count)
+      self.assertEqual([ph.name], dump1.core_metadata.input_names)
+      self.assertEqual([x.name], dump1.core_metadata.output_names)
+      self.assertEqual([], dump1.core_metadata.target_nodes)
+      shutil.rmtree(self._dump_root)
+
+      run_options = config_pb2.RunOptions(output_partition_graphs=True)
+      debug_utils.watch_graph(
+          run_options, sess.graph, debug_urls=self._debug_urls(), global_step=2)
+
+      # Calling run() with the same feed, same output and same debug watch
+      # options should increment both session_run_count and
+      # executor_step_count.
+      sess.run(x, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
+      dump2 = debug_data.DebugDumpDir(self._dump_root)
+      self.assertEqual(2, dump2.core_metadata.global_step)
+      self.assertEqual(dump1.core_metadata.session_run_count + 1,
+                       dump2.core_metadata.session_run_count)
+      self.assertEqual(dump1.core_metadata.executor_step_count + 1,
+                       dump2.core_metadata.executor_step_count)
+      self.assertEqual([ph.name], dump2.core_metadata.input_names)
+      self.assertEqual([x.name], dump2.core_metadata.output_names)
+      self.assertEqual([], dump2.core_metadata.target_nodes)
+      shutil.rmtree(self._dump_root)
+
+      run_options = config_pb2.RunOptions(output_partition_graphs=True)
+      debug_utils.watch_graph(
+          run_options, sess.graph, debug_urls=self._debug_urls(), global_step=3)
+
+      # Calling run() with a different output should increment
+      # session_run_count, but not executor_step_count.
+      sess.run(y, feed_dict={ph: np.array([[7.0, 8.0]])}, options=run_options)
+      dump3 = debug_data.DebugDumpDir(self._dump_root)
+      self.assertEqual(3, dump3.core_metadata.global_step)
+      self.assertEqual(dump2.core_metadata.session_run_count + 1,
+                       dump3.core_metadata.session_run_count)
+      self.assertEqual(0, dump3.core_metadata.executor_step_count)
+      self.assertEqual([ph.name], dump3.core_metadata.input_names)
+      self.assertEqual([y.name], dump3.core_metadata.output_names)
+      self.assertEqual([], dump3.core_metadata.target_nodes)
+
   def testDebuggingDuringOpError(self):
     """Test the debug tensor dumping when error occurs in graph runtime."""
 
@@ -894,6 +964,12 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
 
       dump = debug_data.DebugDumpDir(self._dump_root)
 
+      self.assertGreaterEqual(dump.core_metadata.session_run_count, 0)
+      self.assertGreaterEqual(dump.core_metadata.executor_step_count, 0)
+      self.assertEqual([ph.name], dump.core_metadata.input_names)
+      self.assertEqual([y.name], dump.core_metadata.output_names)
+      self.assertEqual([], dump.core_metadata.target_nodes)
+
       # Despite the fact that the run() call errored out and partition_graphs
       # are not available via run_metadata, the partition graphs should still
       # have been loaded from the dump directory.
@@ -1045,5 +1121,96 @@ class SessionDebugTestBase(test_util.TensorFlowTestCase):
         self.assertIsInstance(trace, tuple)
 
 
+class DebugConcurrentRunCallsTest(test_util.TensorFlowTestCase):
+  """Test for debugging concurrent Session.run() calls."""
+
+  def _get_concurrent_debug_urls(self):
+    """Abstract method to generate debug URLs for concurrent debugged runs."""
+    raise NotImplementedError(
+        "_get_concurrent_debug_urls is not implemented in the base test class")
+
+  def testDebugConcurrentVariableUpdates(self):
+    if test.is_gpu_available():
+      self.skipTest("No testing concurrent runs on a single GPU.")
+
+    with session.Session() as sess:
+      v = variables.Variable(30.0, name="v")
+      constants = []
+      for i in xrange(self._num_concurrent_runs):
+        constants.append(constant_op.constant(1.0, name="c%d" % i))
+      incs = [
+          state_ops.assign_add(
+              v, c, use_locking=True, name=("inc%d" % i))
+          for (i, c) in enumerate(constants)
+      ]
+      sess.run(v.initializer)
+
+      concurrent_debug_urls = self._get_concurrent_debug_urls()
+
+      def inc_job(index):
+        run_options = config_pb2.RunOptions(output_partition_graphs=True)
+        debug_utils.watch_graph(
+            run_options, sess.graph, debug_urls=concurrent_debug_urls[index])
+        for _ in xrange(100):
+          sess.run(incs[index], options=run_options)
+
+      inc_threads = []
+      for index in xrange(self._num_concurrent_runs):
+        inc_thread = threading.Thread(target=functools.partial(inc_job, index))
+        inc_thread.start()
+        inc_threads.append(inc_thread)
+      for inc_thread in inc_threads:
+        inc_thread.join()
+
+      self.assertAllClose(30.0 + 1.0 * self._num_concurrent_runs * 100,
+                          sess.run(v))
+
+      all_session_run_counts = []
+      for index in xrange(self._num_concurrent_runs):
+        dump = debug_data.DebugDumpDir(self._dump_roots[index])
+        self.assertTrue(dump.loaded_partition_graphs())
+
+        v_data = dump.get_tensors("v", 0, "DebugIdentity")
+        self.assertEqual(100, len(v_data))
+
+        # Examine all the core metadata files
+        core_metadata_files = glob.glob(
+            os.path.join(self._dump_roots[index], "_tfdbg_core*"))
+
+        timestamps = []
+        session_run_counts = []
+        executor_step_counts = []
+        for core_metadata_file in core_metadata_files:
+          with open(core_metadata_file, "rb") as f:
+            event = event_pb2.Event()
+            event.ParseFromString(f.read())
+            core_metadata = (
+                debug_data.extract_core_metadata_from_event_proto(event))
+            timestamps.append(event.wall_time)
+            session_run_counts.append(core_metadata.session_run_count)
+            executor_step_counts.append(core_metadata.executor_step_count)
+
+        all_session_run_counts.extend(session_run_counts)
+
+        # Assert that executor_step_count increases by one at a time.
+        executor_step_counts = zip(timestamps, executor_step_counts)
+        executor_step_counts = sorted(executor_step_counts, key=lambda x: x[0])
+        for i in xrange(len(executor_step_counts) - 1):
+          self.assertEquals(executor_step_counts[i][1] + 1,
+                            executor_step_counts[i + 1][1])
+
+        # Assert that session_run_count increase monotonically.
+        session_run_counts = zip(timestamps, session_run_counts)
+        session_run_counts = sorted(session_run_counts, key=lambda x: x[0])
+        for i in xrange(len(session_run_counts) - 1):
+          self.assertGreater(session_run_counts[i + 1][1],
+                             session_run_counts[i][1])
+
+      # Assert that the session_run_counts from the concurrent run() calls are
+      # all unique.
+      self.assertEqual(len(all_session_run_counts),
+                       len(set(all_session_run_counts)))
+
+
 if __name__ == "__main__":
   googletest.main()