Remove MetadataListener and StepStats from EagerContext
PiperOrigin-RevId: 263789105
This commit is contained in:
parent
79a430dce9
commit
0c9c859a73
tensorflow
c/eager
core/common_runtime/eager
python/eager
@ -952,12 +952,10 @@ unsigned char TFE_ContextHasFunction(TFE_Context* ctx, const char* name) {
|
||||
|
||||
void TFE_ContextEnableRunMetadata(TFE_Context* ctx) {
|
||||
ctx->context->SetShouldStoreGraphs(true);
|
||||
ctx->context->SetShouldStoreStepStats(true);
|
||||
}
|
||||
|
||||
void TFE_ContextDisableRunMetadata(TFE_Context* ctx) {
|
||||
ctx->context->SetShouldStoreGraphs(false);
|
||||
ctx->context->SetShouldStoreStepStats(false);
|
||||
}
|
||||
|
||||
} // extern "C"
|
||||
|
@ -342,28 +342,9 @@ Status EagerContext::FindDeviceByName(const string& name,
|
||||
}
|
||||
|
||||
void EagerContext::ClearRunMetadata() {
|
||||
if (metadata_listener_ != nullptr) {
|
||||
metadata_listener_->BeforeClearRunMetadata();
|
||||
}
|
||||
run_metadata_.Clear();
|
||||
}
|
||||
|
||||
Status EagerContext::RegisterRunMetadataListener(
|
||||
RunMetadataListener* listener) {
|
||||
mutex_lock l(metadata_mu_);
|
||||
if (metadata_listener_ != nullptr) {
|
||||
return Status(error::Code::INVALID_ARGUMENT,
|
||||
"Cannot run two eager profiler at the same time");
|
||||
}
|
||||
metadata_listener_ = listener;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void EagerContext::ClearRunMetadataListener() {
|
||||
mutex_lock l(metadata_mu_);
|
||||
metadata_listener_ = nullptr;
|
||||
}
|
||||
|
||||
void EagerContext::StartStep() {
|
||||
mutex_lock ml(metadata_mu_);
|
||||
num_active_steps_++;
|
||||
@ -504,28 +485,12 @@ void EagerContext::AddKernelToCache(Fprint128 cache_key,
|
||||
}
|
||||
}
|
||||
|
||||
bool EagerContext::ShouldStoreGraphs() {
|
||||
mutex_lock ml(metadata_mu_);
|
||||
return should_store_graphs_.load() || metadata_listener_ != nullptr;
|
||||
}
|
||||
|
||||
bool EagerContext::ShouldStoreStepStats() {
|
||||
mutex_lock ml(metadata_mu_);
|
||||
return should_store_step_stats_.load() || metadata_listener_ != nullptr;
|
||||
}
|
||||
bool EagerContext::ShouldStoreGraphs() { return should_store_graphs_.load(); }
|
||||
|
||||
void EagerContext::SetShouldStoreGraphs(bool value) {
|
||||
mutex_lock ml(metadata_mu_);
|
||||
should_store_graphs_.store(value);
|
||||
if (!value || metadata_listener_ != nullptr) {
|
||||
run_metadata_.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
void EagerContext::SetShouldStoreStepStats(bool value) {
|
||||
mutex_lock ml(metadata_mu_);
|
||||
should_store_step_stats_.store(value);
|
||||
if (!value || metadata_listener_ != nullptr) {
|
||||
if (!value) {
|
||||
run_metadata_.Clear();
|
||||
}
|
||||
}
|
||||
|
@ -243,17 +243,11 @@ class EagerContext : public core::RefCounted {
|
||||
|
||||
// TODO(apassos) clean up RunMetadata storage.
|
||||
mutex* MetadataMu() LOCK_RETURNED(metadata_mu_) { return &metadata_mu_; }
|
||||
bool ShouldStoreStepStats() LOCKS_EXCLUDED(metadata_mu_);
|
||||
void SetShouldStoreStepStats(bool value);
|
||||
bool ShouldStoreGraphs() LOCKS_EXCLUDED(metadata_mu_);
|
||||
void SetShouldStoreGraphs(bool value);
|
||||
RunMetadata* RunMetadataProto() { return &run_metadata_; }
|
||||
void ClearRunMetadata() EXCLUSIVE_LOCKS_REQUIRED(metadata_mu_);
|
||||
|
||||
Status RegisterRunMetadataListener(RunMetadataListener* listener)
|
||||
LOCKS_EXCLUDED(metadata_mu_);
|
||||
void ClearRunMetadataListener() LOCKS_EXCLUDED(metadata_mu_);
|
||||
|
||||
void StartStep();
|
||||
void EndStep();
|
||||
ScopedStepContainer* StepContainer();
|
||||
@ -399,11 +393,9 @@ class EagerContext : public core::RefCounted {
|
||||
GUARDED_BY(cache_mu_);
|
||||
|
||||
// Whether we should compute RunMetadata.
|
||||
std::atomic<bool> should_store_step_stats_{false};
|
||||
std::atomic<bool> should_store_graphs_{false};
|
||||
mutex metadata_mu_;
|
||||
RunMetadata run_metadata_ GUARDED_BY(metadata_mu_);
|
||||
RunMetadataListener* metadata_listener_ GUARDED_BY(metadata_mu_) = nullptr;
|
||||
GraphCollector graph_collector_;
|
||||
// TODO(fishx): Allow update following two bool after context creation.
|
||||
const bool log_device_placement_;
|
||||
|
@ -126,7 +126,6 @@ const string DeviceNameOrUnspecified(const DeviceNameUtils::ParsedName& name) {
|
||||
// unset and we might have selected some specific device to run this op on.
|
||||
Status MaybeCopyInputToExpectedDevice(EagerOperation* op, Device* op_device,
|
||||
int i, Device* expected_input_device,
|
||||
RunMetadata* run_metadata,
|
||||
TensorHandle** result) {
|
||||
tensorflow::TensorHandle* handle = op->Inputs()[i];
|
||||
EagerContext* ctx = op->EagerContext();
|
||||
@ -175,29 +174,12 @@ Status MaybeCopyInputToExpectedDevice(EagerOperation* op, Device* op_device,
|
||||
}
|
||||
// We are only here if the policy is warn or silent copies, so we should
|
||||
// trigger a copy.
|
||||
auto pre_time_nanos = Env::Default()->NowNanos();
|
||||
TensorHandle* result_handle = nullptr;
|
||||
profiler::TraceMe activity("_Send", profiler::TraceMeLevel::kInfo);
|
||||
Status status =
|
||||
EagerCopyToDevice(handle, ctx, op->Executor(), expected_input_device,
|
||||
ctx->MirrorTensors(), &result_handle);
|
||||
if (run_metadata != nullptr) {
|
||||
auto* step_stats = run_metadata->mutable_step_stats();
|
||||
MaybeInitializeStepStats(step_stats, ctx);
|
||||
// Record the sending on the source device for now.
|
||||
int device_idx = StepStatsDeviceIndex(step_stats, ctx, handle_device);
|
||||
auto* dev_stats = step_stats->mutable_dev_stats(device_idx);
|
||||
auto* node_stats = dev_stats->add_node_stats();
|
||||
node_stats->set_node_name("_Send");
|
||||
node_stats->set_all_start_micros(pre_time_nanos / EnvTime::kMicrosToNanos);
|
||||
node_stats->set_all_start_nanos(pre_time_nanos);
|
||||
int64 now_nanos = Env::Default()->NowNanos();
|
||||
node_stats->set_op_end_rel_micros((now_nanos - pre_time_nanos) /
|
||||
EnvTime::kMicrosToNanos);
|
||||
node_stats->set_op_end_rel_nanos(now_nanos - pre_time_nanos);
|
||||
node_stats->set_all_end_rel_micros((now_nanos - pre_time_nanos) /
|
||||
EnvTime::kMicrosToNanos);
|
||||
node_stats->set_all_end_rel_nanos(now_nanos - pre_time_nanos);
|
||||
}
|
||||
activity.Stop();
|
||||
if (!status.ok()) {
|
||||
if (result_handle != nullptr) result_handle->Unref();
|
||||
return errors::Internal("Failed copying input tensor from ",
|
||||
@ -216,8 +198,7 @@ Status MaybeCopyInputToExpectedDevice(EagerOperation* op, Device* op_device,
|
||||
// unspecified.
|
||||
Status ValidateInputTypeAndPlacement(
|
||||
EagerContext* ctx, EagerOperation* op,
|
||||
const core::RefCountPtr<KernelAndDevice>& kernel,
|
||||
RunMetadata* run_metadata) {
|
||||
const core::RefCountPtr<KernelAndDevice>& kernel) {
|
||||
profiler::TraceMe activity("ValidateInputTypeAndPlacement",
|
||||
profiler::TraceMeLevel::kInfo);
|
||||
if (kernel->num_inputs() != op->Inputs().size()) {
|
||||
@ -228,7 +209,7 @@ Status ValidateInputTypeAndPlacement(
|
||||
Device* expected_device = kernel->InputDevice(i);
|
||||
TensorHandle* handle = nullptr;
|
||||
TF_RETURN_IF_ERROR(MaybeCopyInputToExpectedDevice(
|
||||
op, kernel->device(), i, expected_device, run_metadata, &handle));
|
||||
op, kernel->device(), i, expected_device, &handle));
|
||||
op->UpdateInput(i, handle);
|
||||
// Unref handle since it has a ref as an input now
|
||||
handle->Unref();
|
||||
@ -642,29 +623,13 @@ Status EagerLocalExecute(EagerOperation* op, TensorHandle** retvals,
|
||||
*num_retvals);
|
||||
}
|
||||
*num_retvals = num_outputs;
|
||||
TF_RETURN_IF_ERROR(ValidateInputTypeAndPlacement(
|
||||
ctx, op, kernel,
|
||||
ctx->ShouldStoreStepStats() ? ctx->RunMetadataProto() : nullptr));
|
||||
TF_RETURN_IF_ERROR(ValidateInputTypeAndPlacement(ctx, op, kernel));
|
||||
|
||||
std::unique_ptr<NodeExecStats> maybe_stats;
|
||||
StepStats* maybe_step_stats = nullptr;
|
||||
GraphCollector* graph_collector = nullptr;
|
||||
if (ctx->ShouldStoreGraphs()) {
|
||||
graph_collector = ctx->GetGraphCollector();
|
||||
}
|
||||
if (ctx->ShouldStoreStepStats()) {
|
||||
maybe_step_stats = ctx->RunMetadataProto()->mutable_step_stats();
|
||||
int64 now_nanos = Env::Default()->NowNanos();
|
||||
maybe_stats.reset(new NodeExecStats);
|
||||
maybe_stats->set_node_name(op->Name());
|
||||
maybe_stats->set_all_start_micros(now_nanos / EnvTime::kMicrosToNanos);
|
||||
maybe_stats->set_all_start_nanos(now_nanos);
|
||||
maybe_stats->set_op_start_rel_micros(0);
|
||||
maybe_stats->set_op_start_rel_nanos(0);
|
||||
maybe_stats->set_scheduled_micros(now_nanos / EnvTime::kMicrosToNanos);
|
||||
maybe_stats->set_scheduled_nanos(now_nanos);
|
||||
// TODO(apassos) track referenced tensors
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_outputs; ++i) {
|
||||
TF_RETURN_IF_ERROR(TensorHandle::CreateAsyncLocalHandle(
|
||||
@ -674,10 +639,10 @@ Status EagerLocalExecute(EagerOperation* op, TensorHandle** retvals,
|
||||
output_dtypes[i], ctx, &retvals[i]));
|
||||
}
|
||||
|
||||
std::unique_ptr<EagerNode> node(new ExecuteNode(
|
||||
ctx, op->Inputs(), std::move(kernel), maybe_stats.release(),
|
||||
maybe_step_stats, graph_collector, output_dtypes,
|
||||
op->GetCancellationManager(), {retvals, num_outputs}));
|
||||
std::unique_ptr<EagerNode> node(
|
||||
new ExecuteNode(ctx, op->Inputs(), std::move(kernel), nullptr,
|
||||
maybe_step_stats, graph_collector, output_dtypes,
|
||||
op->GetCancellationManager(), {retvals, num_outputs}));
|
||||
// Note that for async mode, execution order will make sure that all
|
||||
// input handles are ready before executing them.
|
||||
// TODO(b/137118203): Consider executing "cheap" kernels inline for
|
||||
@ -749,8 +714,7 @@ Status EagerRemoteExecute(EagerOperation* op, TensorHandle** retvals,
|
||||
// the op might have its inputs on host memory.
|
||||
TensorHandle* handle = nullptr;
|
||||
TF_RETURN_IF_ERROR(MaybeCopyInputToExpectedDevice(
|
||||
op, op->Device(), i, remote_cpu_device,
|
||||
/* run_metadata= */ nullptr, &handle));
|
||||
op, op->Device(), i, remote_cpu_device, &handle));
|
||||
op->UpdateInput(i, handle);
|
||||
input = handle;
|
||||
input_device = remote_cpu_device;
|
||||
@ -1049,44 +1013,6 @@ Status EagerKernelExecute(EagerContext* ctx,
|
||||
collector->ClearGraphs();
|
||||
}
|
||||
}
|
||||
if (maybe_stats != nullptr) {
|
||||
int64 nanos = Env::Default()->NowNanos();
|
||||
maybe_stats->set_op_end_rel_micros(nanos / EnvTime::kMicrosToNanos -
|
||||
maybe_stats->all_start_micros());
|
||||
maybe_stats->set_op_end_rel_nanos(nanos - maybe_stats->all_start_nanos());
|
||||
maybe_stats->set_all_end_rel_micros(nanos / EnvTime::kMicrosToNanos -
|
||||
maybe_stats->all_start_micros());
|
||||
maybe_stats->set_all_end_rel_nanos(nanos - maybe_stats->all_start_nanos());
|
||||
if (ctx->ShouldStoreStepStats()) {
|
||||
mutex_lock ml(*ctx->MetadataMu());
|
||||
{
|
||||
auto* step_stats = ctx->RunMetadataProto()->mutable_step_stats();
|
||||
// Lazily initialize the RunMetadata with information about all devices
|
||||
// if this is the first call.
|
||||
while (step_stats->dev_stats_size() < ctx->devices()->size()) {
|
||||
step_stats->add_dev_stats();
|
||||
}
|
||||
// Find the current device's index.
|
||||
// If device is a nullptr (we are running a function without explicitly
|
||||
// requested device), attribute the function runtime to CPU.
|
||||
Device* attribution_device = kernel->device();
|
||||
if (attribution_device == nullptr) {
|
||||
attribution_device = ctx->HostCPU();
|
||||
}
|
||||
int device_idx = 0;
|
||||
for (int i = 0; i < ctx->devices()->size(); ++i) {
|
||||
if (ctx->devices()->at(i) == attribution_device) {
|
||||
device_idx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Populate the device stats for this device.
|
||||
auto* dev_stats = step_stats->mutable_dev_stats(device_idx);
|
||||
dev_stats->set_device(attribution_device->name());
|
||||
*dev_stats->add_node_stats() = *maybe_stats;
|
||||
}
|
||||
}
|
||||
}
|
||||
DCHECK_EQ(retvals.size(), outputs.size());
|
||||
for (int i = 0; i < retvals.size(); ++i) {
|
||||
DCHECK_EQ(kernel->device(), retvals[i]->op_device());
|
||||
|
@ -323,19 +323,6 @@ class TFETest(test_util.TensorFlowTestCase):
|
||||
self.assertTrue(has_cpu_device)
|
||||
del ctx
|
||||
|
||||
def testRunMetadata(self):
|
||||
context.enable_run_metadata()
|
||||
t = constant_op.constant(1.0)
|
||||
_ = t + t # Runs an operation which will be in the RunMetadata
|
||||
run_metadata = context.export_run_metadata()
|
||||
context.disable_run_metadata()
|
||||
step_stats = run_metadata.step_stats
|
||||
self.assertGreater(len(step_stats.dev_stats), 0)
|
||||
cpu_stats = step_stats.dev_stats[0]
|
||||
self.assertEqual('/job:localhost/replica:0/task:0/device:CPU:0',
|
||||
cpu_stats.device)
|
||||
self.assertGreaterEqual(len(cpu_stats.node_stats), 1)
|
||||
|
||||
def testMultiCpuPlacement(self):
|
||||
with ops.device('cpu:1'):
|
||||
x = constant_op.constant(1.0)
|
||||
|
@ -833,16 +833,6 @@ class FunctionTest(test.TestCase, parameterized.TestCase):
|
||||
f(constant_op.constant(1.0))
|
||||
run_metadata = context.export_run_metadata()
|
||||
context.disable_run_metadata()
|
||||
step_stats = run_metadata.step_stats
|
||||
self.assertNotEmpty(step_stats.dev_stats)
|
||||
cpu_stats = step_stats.dev_stats[0]
|
||||
self.assertEqual('/job:localhost/replica:0/task:0/device:CPU:0',
|
||||
cpu_stats.device)
|
||||
# Testing for at least 2 because the function call should generate at most
|
||||
# one entry in the step_stats; the ops inside function can generate
|
||||
# arbitrarily many (placeholders, return identities, etc, might be included
|
||||
# or not in the future, so shouldn't be tested for exactly.
|
||||
self.assertGreaterEqual(len(cpu_stats.node_stats), 2)
|
||||
self.assertLen(run_metadata.partition_graphs, 1)
|
||||
|
||||
def testGraphModeCaptureVariable(self):
|
||||
|
Loading…
Reference in New Issue
Block a user