diff --git a/tensorflow/core/common_runtime/constant_folding.cc b/tensorflow/core/common_runtime/constant_folding.cc index ee4d6f2bfd9..910f67bf816 100644 --- a/tensorflow/core/common_runtime/constant_folding.cc +++ b/tensorflow/core/common_runtime/constant_folding.cc @@ -321,6 +321,7 @@ bool DoConstantFolding(const ConstantFoldingOptions& opts, Graph* graph) { core::ScopedUnref rendez_unref(rendez); Executor::Args args; + args.step_id = Executor::Args::CONSTANT_FOLDING_STEP_ID; args.runner = runner; args.rendezvous = rendez; diff --git a/tensorflow/core/common_runtime/direct_session.cc b/tensorflow/core/common_runtime/direct_session.cc index 67186492428..ce6f77fcced 100644 --- a/tensorflow/core/common_runtime/direct_session.cc +++ b/tensorflow/core/common_runtime/direct_session.cc @@ -15,6 +15,7 @@ limitations under the License. #include "tensorflow/core/common_runtime/direct_session.h" +#include <atomic> #include <string> #include <vector> @@ -89,6 +90,8 @@ string GetRendezvousKey(const string& tensor_name, } // namespace +std::atomic_int_fast64_t DirectSession::step_id_counter_(0); + // NOTE: On Android with a single device, there is never // a risk of an OpKernel blocking indefinitely: // @@ -286,9 +289,12 @@ Status DirectSession::Run(const NamedTensorList& inputs, }); Executor::Args args; + args.step_id = step_id_counter_.fetch_add(1); args.rendezvous = run_state.rendez; args.cancellation_manager = cancellation_manager_; args.runner = [this](Executor::Args::Closure c) { SchedClosure(c); }; + LOG(INFO) << "Step " << args.step_id << " is for handle " + << run_state_args.handle; for (const auto& item : executors_and_keys->items) { item.executor->RunAsync(args, barrier->Get()); @@ -350,9 +356,15 @@ Status DirectSession::PRunSetup(const std::vector<string>& input_names, }); Executor::Args args; + { + mutex_lock l(mu_); + args.step_id = name_counter_++; + } args.rendezvous = run_state->rendez; args.cancellation_manager = cancellation_manager_; args.runner = [this](Executor::Args::Closure c) { SchedClosure(c); }; + LOG(INFO) << "Step " << args.step_id << " is for handle " + << run_state_args.handle; for (auto& item : executors_and_keys->items) { Executor* exec = item.executor; diff --git a/tensorflow/core/common_runtime/direct_session.h b/tensorflow/core/common_runtime/direct_session.h index 9c7c57d6c72..4b042fe70b0 100644 --- a/tensorflow/core/common_runtime/direct_session.h +++ b/tensorflow/core/common_runtime/direct_session.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef TENSORFLOW_COMMON_RUNTIME_DIRECT_SESSION_H_ #define TENSORFLOW_COMMON_RUNTIME_DIRECT_SESSION_H_ +#include <atomic> #include <memory> #include <string> #include <unordered_map> @@ -230,6 +231,9 @@ class DirectSession : public Session { // For generating unique names. int64 name_counter_ GUARDED_BY(mu_) = 0; + // For generating step ids that are unique across all sessions. + static std::atomic_int_fast64_t step_id_counter_; + TF_DISALLOW_COPY_AND_ASSIGN(DirectSession); }; diff --git a/tensorflow/core/common_runtime/executor.cc b/tensorflow/core/common_runtime/executor.cc index c1d6e873c58..37a71cabdb1 100644 --- a/tensorflow/core/common_runtime/executor.cc +++ b/tensorflow/core/common_runtime/executor.cc @@ -634,6 +634,7 @@ class ExecutorState { typedef gtl::InlinedVector<TaggedNode, 8> TaggedNodeSeq; typedef gtl::InlinedVector<Entry, 4> EntryVector; + int64 step_id_; // Not owned. Rendezvous* rendezvous_; StepStatsCollector* stats_collector_; @@ -766,7 +767,8 @@ class ExecutorState { }; ExecutorState::ExecutorState(const Executor::Args& args, ExecutorImpl* impl) - : rendezvous_(args.rendezvous), + : step_id_(args.step_id), + rendezvous_(args.rendezvous), stats_collector_(args.stats_collector), slice_reader_cache_(new checkpoint::TensorSliceReaderCacheWrapper), call_frame_(args.call_frame), @@ -906,6 +908,7 @@ void ExecutorState::Process(TaggedNode tagged_node, int64 scheduled_usec) { AllocatorAttributeVec input_alloc_attrs; OpKernelContext::Params params; + params.step_id = step_id_; Device* device = impl_->params_.device; params.device = device; // track allocations if and only if we are collecting statistics @@ -948,7 +951,8 @@ void ExecutorState::Process(TaggedNode tagged_node, int64 scheduled_usec) { nodestats::SetAllStart(stats); } - VLOG(1) << "Process node: " << id << " " << SummarizeNodeDef(node->def()); + VLOG(1) << "Process node: " << id << " step " << params.step_id << " " + << SummarizeNodeDef(node->def()); Entry* input_tensors = GetInputTensors(input_frame, input_iter); Entry* first_input = input_tensors + item.input_start; diff --git a/tensorflow/core/common_runtime/executor.h b/tensorflow/core/common_runtime/executor.h index 0a8e3afa386..846a3e17758 100644 --- a/tensorflow/core/common_runtime/executor.h +++ b/tensorflow/core/common_runtime/executor.h @@ -55,6 +55,11 @@ class Executor { // are alive at least until done is invoked. All pointers to the // argument objects can be nullptr. // + // "step_id" is a process-wide unique identifier for the step being + // run. Executors on different devices may receive the same step_id + // in the case that a step runs Ops on more than one device. The + // step_id is used for tracking resource usage of a given step. + // // RunAsync() uses the given "rendezvous", if not null, as the // mechanism to communicate inputs and outputs of the underlying // graph computation. @@ -75,6 +80,13 @@ class Executor { // RunAsync() dispatches closures to "runner". Typically, "runner" // is backed up by a bounded threadpool. struct Args { + // Executors are sometimes instantiated for initialization work + // like constant folding that is logically outside any computation + // step, and SpecialStepIds lists the ids used for those steps. + enum SpecialStepIds { + CONSTANT_FOLDING_STEP_ID = -1, + }; + int64 step_id = 0; Rendezvous* rendezvous = nullptr; StepStatsCollector* stats_collector = nullptr; FunctionCallFrame* call_frame = nullptr; diff --git a/tensorflow/core/common_runtime/function.cc b/tensorflow/core/common_runtime/function.cc index 2b9f24fa762..ceca3a53ce9 100644 --- a/tensorflow/core/common_runtime/function.cc +++ b/tensorflow/core/common_runtime/function.cc @@ -362,6 +362,7 @@ class CallOp : public AsyncOpKernel { errors::Internal("No function library is provided."), done); FunctionLibraryRuntime::Options opts; + opts.step_id = ctx->step_id(); std::vector<Tensor> args; args.reserve(ctx->num_inputs()); for (int i = 0; i < ctx->num_inputs(); ++i) { @@ -406,6 +407,7 @@ class SymbolicGradientOp : public AsyncOpKernel { ctx, lib->Instantiate(kGradientOp, def().attr(), &handle_), done); FunctionLibraryRuntime::Options opts; + opts.step_id = ctx->step_id(); std::vector<Tensor> args; args.reserve(ctx->num_inputs()); for (int i = 0; i < ctx->num_inputs(); ++i) { @@ -671,6 +673,8 @@ void FunctionLibraryRuntimeImpl::Run(const Options& opts, Handle handle, return done(s); } Executor::Args exec_args; + // Inherit the step_id from the caller. + exec_args.step_id = opts.step_id; exec_args.call_frame = frame; exec_args.cancellation_manager = opts.cancellation_manager; exec_args.runner = runner_; diff --git a/tensorflow/core/framework/function.h b/tensorflow/core/framework/function.h index 8231b91a3ff..e4230edaac6 100644 --- a/tensorflow/core/framework/function.h +++ b/tensorflow/core/framework/function.h @@ -305,6 +305,8 @@ class FunctionLibraryRuntime { // Does not take ownership of "rets". struct Options { CancellationManager* cancellation_manager = nullptr; + // The id of the step that is calling this function. + int64 step_id = 0; }; typedef std::function<void(const Status&)> DoneCallback; virtual void Run(const Options& opts, Handle handle, diff --git a/tensorflow/core/framework/op_kernel.h b/tensorflow/core/framework/op_kernel.h index e4ffcd7352e..eb33c5ea4ed 100644 --- a/tensorflow/core/framework/op_kernel.h +++ b/tensorflow/core/framework/op_kernel.h @@ -447,6 +447,9 @@ class OpKernelContext { struct Params { ~Params() { delete eigen_gpu_device; } + // The step being executed. + int64 step_id = 0; + // The op kernel being computed. OpKernel* op_kernel = nullptr; @@ -528,6 +531,8 @@ class OpKernelContext { Env* env() const { return params_->device->env(); } + int64 step_id() const { return params_->step_id; } + const OpKernel& op_kernel() const { return *params_->op_kernel; } // Input/output signature.