Add a step_id field to the OpKernelContext indicating which step

triggered the Op execution. This will allow us to "charge" resource
usage to a specific step.
Change: 115622649
This commit is contained in:
A. Unique TensorFlower 2016-02-25 17:17:05 -08:00 committed by TensorFlower Gardener
parent ca69460901
commit e8260f3433
8 changed files with 46 additions and 2 deletions

View File

@ -321,6 +321,7 @@ bool DoConstantFolding(const ConstantFoldingOptions& opts, Graph* graph) {
core::ScopedUnref rendez_unref(rendez);
Executor::Args args;
args.step_id = Executor::Args::CONSTANT_FOLDING_STEP_ID;
args.runner = runner;
args.rendezvous = rendez;

View File

@ -15,6 +15,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/direct_session.h"
#include <atomic>
#include <string>
#include <vector>
@ -89,6 +90,8 @@ string GetRendezvousKey(const string& tensor_name,
} // namespace
std::atomic_int_fast64_t DirectSession::step_id_counter_(0);
// NOTE: On Android with a single device, there is never
// a risk of an OpKernel blocking indefinitely:
//
@ -286,9 +289,12 @@ Status DirectSession::Run(const NamedTensorList& inputs,
});
Executor::Args args;
args.step_id = step_id_counter_.fetch_add(1);
args.rendezvous = run_state.rendez;
args.cancellation_manager = cancellation_manager_;
args.runner = [this](Executor::Args::Closure c) { SchedClosure(c); };
LOG(INFO) << "Step " << args.step_id << " is for handle "
<< run_state_args.handle;
for (const auto& item : executors_and_keys->items) {
item.executor->RunAsync(args, barrier->Get());
@ -350,9 +356,15 @@ Status DirectSession::PRunSetup(const std::vector<string>& input_names,
});
Executor::Args args;
{
mutex_lock l(mu_);
args.step_id = name_counter_++;
}
args.rendezvous = run_state->rendez;
args.cancellation_manager = cancellation_manager_;
args.runner = [this](Executor::Args::Closure c) { SchedClosure(c); };
LOG(INFO) << "Step " << args.step_id << " is for handle "
<< run_state_args.handle;
for (auto& item : executors_and_keys->items) {
Executor* exec = item.executor;

View File

@ -16,6 +16,7 @@ limitations under the License.
#ifndef TENSORFLOW_COMMON_RUNTIME_DIRECT_SESSION_H_
#define TENSORFLOW_COMMON_RUNTIME_DIRECT_SESSION_H_
#include <atomic>
#include <memory>
#include <string>
#include <unordered_map>
@ -230,6 +231,9 @@ class DirectSession : public Session {
// For generating unique names.
int64 name_counter_ GUARDED_BY(mu_) = 0;
// For generating step ids that are unique across all sessions.
static std::atomic_int_fast64_t step_id_counter_;
TF_DISALLOW_COPY_AND_ASSIGN(DirectSession);
};

View File

@ -634,6 +634,7 @@ class ExecutorState {
typedef gtl::InlinedVector<TaggedNode, 8> TaggedNodeSeq;
typedef gtl::InlinedVector<Entry, 4> EntryVector;
int64 step_id_;
// Not owned.
Rendezvous* rendezvous_;
StepStatsCollector* stats_collector_;
@ -766,7 +767,8 @@ class ExecutorState {
};
ExecutorState::ExecutorState(const Executor::Args& args, ExecutorImpl* impl)
: rendezvous_(args.rendezvous),
: step_id_(args.step_id),
rendezvous_(args.rendezvous),
stats_collector_(args.stats_collector),
slice_reader_cache_(new checkpoint::TensorSliceReaderCacheWrapper),
call_frame_(args.call_frame),
@ -906,6 +908,7 @@ void ExecutorState::Process(TaggedNode tagged_node, int64 scheduled_usec) {
AllocatorAttributeVec input_alloc_attrs;
OpKernelContext::Params params;
params.step_id = step_id_;
Device* device = impl_->params_.device;
params.device = device;
// track allocations if and only if we are collecting statistics
@ -948,7 +951,8 @@ void ExecutorState::Process(TaggedNode tagged_node, int64 scheduled_usec) {
nodestats::SetAllStart(stats);
}
VLOG(1) << "Process node: " << id << " " << SummarizeNodeDef(node->def());
VLOG(1) << "Process node: " << id << " step " << params.step_id << " "
<< SummarizeNodeDef(node->def());
Entry* input_tensors = GetInputTensors(input_frame, input_iter);
Entry* first_input = input_tensors + item.input_start;

View File

@ -55,6 +55,11 @@ class Executor {
// are alive at least until done is invoked. All pointers to the
// argument objects can be nullptr.
//
// "step_id" is a process-wide unique identifier for the step being
// run. Executors on different devices may receive the same step_id
// in the case that a step runs Ops on more than one device. The
// step_id is used for tracking resource usage of a given step.
//
// RunAsync() uses the given "rendezvous", if not null, as the
// mechanism to communicate inputs and outputs of the underlying
// graph computation.
@ -75,6 +80,13 @@ class Executor {
// RunAsync() dispatches closures to "runner". Typically, "runner"
// is backed up by a bounded threadpool.
struct Args {
// Executors are sometimes instantiated for initialization work
// like constant folding that is logically outside any computation
// step, and SpecialStepIds lists the ids used for those steps.
enum SpecialStepIds {
CONSTANT_FOLDING_STEP_ID = -1,
};
int64 step_id = 0;
Rendezvous* rendezvous = nullptr;
StepStatsCollector* stats_collector = nullptr;
FunctionCallFrame* call_frame = nullptr;

View File

@ -362,6 +362,7 @@ class CallOp : public AsyncOpKernel {
errors::Internal("No function library is provided."),
done);
FunctionLibraryRuntime::Options opts;
opts.step_id = ctx->step_id();
std::vector<Tensor> args;
args.reserve(ctx->num_inputs());
for (int i = 0; i < ctx->num_inputs(); ++i) {
@ -406,6 +407,7 @@ class SymbolicGradientOp : public AsyncOpKernel {
ctx, lib->Instantiate(kGradientOp, def().attr(), &handle_), done);
FunctionLibraryRuntime::Options opts;
opts.step_id = ctx->step_id();
std::vector<Tensor> args;
args.reserve(ctx->num_inputs());
for (int i = 0; i < ctx->num_inputs(); ++i) {
@ -671,6 +673,8 @@ void FunctionLibraryRuntimeImpl::Run(const Options& opts, Handle handle,
return done(s);
}
Executor::Args exec_args;
// Inherit the step_id from the caller.
exec_args.step_id = opts.step_id;
exec_args.call_frame = frame;
exec_args.cancellation_manager = opts.cancellation_manager;
exec_args.runner = runner_;

View File

@ -305,6 +305,8 @@ class FunctionLibraryRuntime {
// Does not take ownership of "rets".
struct Options {
CancellationManager* cancellation_manager = nullptr;
// The id of the step that is calling this function.
int64 step_id = 0;
};
typedef std::function<void(const Status&)> DoneCallback;
virtual void Run(const Options& opts, Handle handle,

View File

@ -447,6 +447,9 @@ class OpKernelContext {
struct Params {
~Params() { delete eigen_gpu_device; }
// The step being executed.
int64 step_id = 0;
// The op kernel being computed.
OpKernel* op_kernel = nullptr;
@ -528,6 +531,8 @@ class OpKernelContext {
Env* env() const { return params_->device->env(); }
int64 step_id() const { return params_->step_id; }
const OpKernel& op_kernel() const { return *params_->op_kernel; }
// Input/output signature.