Add delayed start capability to profiler session.
PiperOrigin-RevId: 339490700 Change-Id: Ib06ca2d97009562dfd27563774f6d9db939e7e11
This commit is contained in:
parent
9587103251
commit
a838a5f5a1
tensorflow
@ -55,6 +55,7 @@ cc_library(
|
|||||||
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
||||||
"//tensorflow/core/profiler:profiler_options_proto_cc",
|
"//tensorflow/core/profiler:profiler_options_proto_cc",
|
||||||
"@com_google_absl//absl/memory",
|
"@com_google_absl//absl/memory",
|
||||||
|
"@com_google_absl//absl/time",
|
||||||
"//tensorflow/core:protos_all_cc",
|
"//tensorflow/core:protos_all_cc",
|
||||||
] + if_not_android([
|
] + if_not_android([
|
||||||
":profiler_lock",
|
":profiler_lock",
|
||||||
|
@ -18,7 +18,10 @@ limitations under the License.
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
#include "absl/memory/memory.h"
|
#include "absl/memory/memory.h"
|
||||||
|
#include "absl/time/clock.h"
|
||||||
|
#include "absl/time/time.h"
|
||||||
#include "tensorflow/core/platform/env_time.h"
|
#include "tensorflow/core/platform/env_time.h"
|
||||||
|
#include "tensorflow/core/platform/errors.h"
|
||||||
#include "tensorflow/core/platform/logging.h"
|
#include "tensorflow/core/platform/logging.h"
|
||||||
#include "tensorflow/core/platform/mutex.h"
|
#include "tensorflow/core/platform/mutex.h"
|
||||||
#include "tensorflow/core/platform/platform.h"
|
#include "tensorflow/core/platform/platform.h"
|
||||||
@ -65,6 +68,7 @@ tensorflow::Status ProfilerSession::Status() {
|
|||||||
Status ProfilerSession::CollectData(profiler::XSpace* space) {
|
Status ProfilerSession::CollectData(profiler::XSpace* space) {
|
||||||
mutex_lock l(mutex_);
|
mutex_lock l(mutex_);
|
||||||
if (!status_.ok()) return status_;
|
if (!status_.ok()) return status_;
|
||||||
|
LOG(INFO) << "Profiler session collecting data.";
|
||||||
for (auto& profiler : profilers_) {
|
for (auto& profiler : profilers_) {
|
||||||
profiler->Stop().IgnoreError();
|
profiler->Stop().IgnoreError();
|
||||||
}
|
}
|
||||||
@ -116,7 +120,6 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
|
|||||||
#else
|
#else
|
||||||
: active_(false),
|
: active_(false),
|
||||||
#endif
|
#endif
|
||||||
start_time_ns_(EnvTime::NowNanos()),
|
|
||||||
options_(std::move(options)) {
|
options_(std::move(options)) {
|
||||||
if (!active_) {
|
if (!active_) {
|
||||||
#if !defined(IS_MOBILE_PLATFORM)
|
#if !defined(IS_MOBILE_PLATFORM)
|
||||||
@ -130,6 +133,25 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << "Profiler session initializing.";
|
||||||
|
// Sleep until it is time to start profiling.
|
||||||
|
const bool delayed_start = options_.start_timestamp_ns() > 0;
|
||||||
|
if (delayed_start) {
|
||||||
|
absl::Time scheduled_start =
|
||||||
|
absl::FromUnixNanos(options_.start_timestamp_ns());
|
||||||
|
auto now = absl::Now();
|
||||||
|
if (scheduled_start < now) {
|
||||||
|
LOG(WARNING) << "Profiling is late (" << now
|
||||||
|
<< ") for the scheduled start (" << scheduled_start
|
||||||
|
<< ") and will start immediately.";
|
||||||
|
} else {
|
||||||
|
absl::Duration sleep_duration = scheduled_start - now;
|
||||||
|
LOG(INFO) << "Delaying start of profiler session by " << sleep_duration;
|
||||||
|
absl::SleepFor(sleep_duration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start_time_ns_ = EnvTime::NowNanos();
|
||||||
LOG(INFO) << "Profiler session started.";
|
LOG(INFO) << "Profiler session started.";
|
||||||
|
|
||||||
#if !defined(IS_MOBILE_PLATFORM)
|
#if !defined(IS_MOBILE_PLATFORM)
|
||||||
@ -147,7 +169,7 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
|
|||||||
}
|
}
|
||||||
|
|
||||||
ProfilerSession::~ProfilerSession() {
|
ProfilerSession::~ProfilerSession() {
|
||||||
VLOG(1) << "Profiler session stopping.";
|
LOG(INFO) << "Profiler session tear down.";
|
||||||
for (auto& profiler : profilers_) {
|
for (auto& profiler : profilers_) {
|
||||||
profiler->Stop().IgnoreError();
|
profiler->Stop().IgnoreError();
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ class ProfilerSession {
|
|||||||
bool active_ TF_GUARDED_BY(mutex_);
|
bool active_ TF_GUARDED_BY(mutex_);
|
||||||
|
|
||||||
tensorflow::Status status_ TF_GUARDED_BY(mutex_);
|
tensorflow::Status status_ TF_GUARDED_BY(mutex_);
|
||||||
const uint64 start_time_ns_;
|
uint64 start_time_ns_;
|
||||||
mutex mutex_;
|
mutex mutex_;
|
||||||
ProfileOptions options_;
|
ProfileOptions options_;
|
||||||
};
|
};
|
||||||
|
@ -63,7 +63,7 @@ message ProfileOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Options for remote profiler session manager.
|
// Options for remote profiler session manager.
|
||||||
// Next ID: 5
|
// Next ID: 6
|
||||||
message RemoteProfilerSessionManagerOptions {
|
message RemoteProfilerSessionManagerOptions {
|
||||||
// Options for each local profiler.
|
// Options for each local profiler.
|
||||||
ProfileOptions profiler_options = 1;
|
ProfileOptions profiler_options = 1;
|
||||||
@ -79,4 +79,7 @@ message RemoteProfilerSessionManagerOptions {
|
|||||||
// continues until interrupted. Otherwise, value must be greater than
|
// continues until interrupted. Otherwise, value must be greater than
|
||||||
// profiler_options.duration_ms.
|
// profiler_options.duration_ms.
|
||||||
uint64 max_session_duration_ms = 4;
|
uint64 max_session_duration_ms = 4;
|
||||||
|
|
||||||
|
// Start of profiling is delayed by this much (in milliseconds).
|
||||||
|
uint64 delay_ms = 5;
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ cc_library(
|
|||||||
"//tensorflow/core/profiler/convert:xplane_to_profile_response",
|
"//tensorflow/core/profiler/convert:xplane_to_profile_response",
|
||||||
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
||||||
"@com_google_absl//absl/strings",
|
"@com_google_absl//absl/strings",
|
||||||
|
"@com_google_absl//absl/time",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,6 +21,8 @@ limitations under the License.
|
|||||||
|
|
||||||
#include "absl/strings/str_join.h"
|
#include "absl/strings/str_join.h"
|
||||||
#include "absl/strings/str_split.h"
|
#include "absl/strings/str_split.h"
|
||||||
|
#include "absl/time/clock.h"
|
||||||
|
#include "absl/time/time.h"
|
||||||
#include "tensorflow/core/platform/errors.h"
|
#include "tensorflow/core/platform/errors.h"
|
||||||
#include "tensorflow/core/platform/host_info.h"
|
#include "tensorflow/core/platform/host_info.h"
|
||||||
#include "tensorflow/core/platform/status.h"
|
#include "tensorflow/core/platform/status.h"
|
||||||
@ -176,7 +178,7 @@ Status NewSession(absl::string_view repository_root,
|
|||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
Status Trace(const std::string& logdir, int num_tracing_attempts,
|
Status Trace(const std::string& logdir, int num_tracing_attempts,
|
||||||
const RemoteProfilerSessionManagerOptions& opts,
|
RemoteProfilerSessionManagerOptions& opts,
|
||||||
bool is_cloud_tpu_session) {
|
bool is_cloud_tpu_session) {
|
||||||
DCHECK_GT(opts.profiler_options().duration_ms(), 0);
|
DCHECK_GT(opts.profiler_options().duration_ms(), 0);
|
||||||
DCHECK(!opts.service_addresses().empty());
|
DCHECK(!opts.service_addresses().empty());
|
||||||
@ -190,6 +192,14 @@ Status Trace(const std::string& logdir, int num_tracing_attempts,
|
|||||||
Status status;
|
Status status;
|
||||||
int remaining_attempts = num_tracing_attempts;
|
int remaining_attempts = num_tracing_attempts;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
auto start_timestamp = absl::Now() + absl::Milliseconds(opts.delay_ms());
|
||||||
|
opts.mutable_profiler_options()->set_start_timestamp_ns(
|
||||||
|
absl::ToUnixNanos(start_timestamp));
|
||||||
|
LOG(INFO) << "Profiler delay_ms was " << opts.delay_ms()
|
||||||
|
<< ", start_timestamp_ns set to "
|
||||||
|
<< opts.profiler_options().start_timestamp_ns() << " ["
|
||||||
|
<< start_timestamp << "]";
|
||||||
|
|
||||||
std::cout << "Starting to trace for " << duration_ms << " ms. "
|
std::cout << "Starting to trace for " << duration_ms << " ms. "
|
||||||
<< "Remaining attempt(s): " << --remaining_attempts << std::endl;
|
<< "Remaining attempt(s): " << --remaining_attempts << std::endl;
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ Status Monitor(const std::string& service_addr, int duration_ms,
|
|||||||
// in the given logdir. If no trace was collected, retries tracing for
|
// in the given logdir. If no trace was collected, retries tracing for
|
||||||
// num_tracing_attempts. Assumes that options have been validated.
|
// num_tracing_attempts. Assumes that options have been validated.
|
||||||
Status Trace(const std::string& logdir, int num_tracing_attempts,
|
Status Trace(const std::string& logdir, int num_tracing_attempts,
|
||||||
const RemoteProfilerSessionManagerOptions& opts,
|
RemoteProfilerSessionManagerOptions& opts,
|
||||||
bool is_cloud_tpu_session);
|
bool is_cloud_tpu_session);
|
||||||
|
|
||||||
} // namespace profiler
|
} // namespace profiler
|
||||||
|
@ -116,8 +116,17 @@ class ProfilerApiTest(test_util.TensorFlowTestCase):
|
|||||||
thread_worker.join(120)
|
thread_worker.join(120)
|
||||||
self._check_xspace_pb_exist(logdir)
|
self._check_xspace_pb_exist(logdir)
|
||||||
|
|
||||||
def test_single_worker_sampling_mode_delayed(self):
|
def test_single_worker_sampling_mode_short_delay(self):
|
||||||
"""Test single worker sampling mode with delay."""
|
"""Test single worker sampling mode with a short delay.
|
||||||
|
|
||||||
|
Expect that requested delayed start time will arrive late, and a subsequent
|
||||||
|
retry will issue an immediate start.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.test_single_worker_sampling_mode(delay_ms=1)
|
||||||
|
|
||||||
|
def test_single_worker_sampling_mode_long_delay(self):
|
||||||
|
"""Test single worker sampling mode with a long delay."""
|
||||||
|
|
||||||
self.test_single_worker_sampling_mode(delay_ms=1000)
|
self.test_single_worker_sampling_mode(delay_ms=1000)
|
||||||
|
|
||||||
|
@ -146,7 +146,6 @@ RemoteProfilerSessionManagerOptions GetOptionsLocked(absl::string_view logdir,
|
|||||||
VLOG(2) << "repository_path set to "
|
VLOG(2) << "repository_path set to "
|
||||||
<< options.profiler_options().repository_path();
|
<< options.profiler_options().repository_path();
|
||||||
|
|
||||||
int delay_ms = 0;
|
|
||||||
for (const auto& kw : opts) {
|
for (const auto& kw : opts) {
|
||||||
std::string key = py::cast<std::string>(kw.first);
|
std::string key = py::cast<std::string>(kw.first);
|
||||||
if (key == "host_tracer_level") {
|
if (key == "host_tracer_level") {
|
||||||
@ -163,26 +162,15 @@ RemoteProfilerSessionManagerOptions GetOptionsLocked(absl::string_view logdir,
|
|||||||
VLOG(1) << "python_tracer_level set to " << value;
|
VLOG(1) << "python_tracer_level set to " << value;
|
||||||
} else if (key == "delay_ms") {
|
} else if (key == "delay_ms") {
|
||||||
if (!kw.second.is_none()) {
|
if (!kw.second.is_none()) {
|
||||||
delay_ms = py::cast<int>(kw.second);
|
auto value = py::cast<int>(kw.second);
|
||||||
|
options.set_delay_ms(value);
|
||||||
|
VLOG(1) << "delay_ms was set to " << value;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG(WARNING) << "Unrecognised key: " << key;
|
LOG(WARNING) << "Unrecognised key: " << key;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (delay_ms) {
|
|
||||||
absl::Time start_timestamp = now + absl::Milliseconds(delay_ms);
|
|
||||||
tensorflow::int64 start_timestamp_ns = absl::ToUnixNanos(start_timestamp);
|
|
||||||
options.mutable_profiler_options()->set_start_timestamp_ns(
|
|
||||||
start_timestamp_ns);
|
|
||||||
LOG(INFO) << "delay_ms was " << delay_ms << ", start_timestamp_ns set to "
|
|
||||||
<< start_timestamp_ns << " [" << start_timestamp << "]";
|
|
||||||
} else {
|
|
||||||
DCHECK_EQ(options.mutable_profiler_options()->start_timestamp_ns(), 0);
|
|
||||||
LOG(INFO) << "Profiling will start immediately because delay_ms was unset "
|
|
||||||
"or zero.";
|
|
||||||
}
|
|
||||||
|
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user