STT-tensorflow/tensorflow/python/profiler/internal/profiler_wrapper.cc
Yi Situ 25a6b8b2ed Add delayed start capability to profiler session.
PiperOrigin-RevId: 339490700
Change-Id: Ib06ca2d97009562dfd27563774f6d9db939e7e11
2020-10-28 17:43:03 -07:00

328 lines
14 KiB
C++

/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <memory>
#include <string>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "pybind11/pybind11.h"
#include "pybind11/pytypes.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/errors.h"
#include "tensorflow/core/platform/host_info.h"
#include "tensorflow/core/platform/status.h"
#include "tensorflow/core/platform/types.h"
#include "tensorflow/core/profiler/convert/xplane_to_tools_data.h"
#include "tensorflow/core/profiler/convert/xplane_to_trace_events.h"
#include "tensorflow/core/profiler/lib/profiler_session.h"
#include "tensorflow/core/profiler/protobuf/xplane.pb.h"
#include "tensorflow/core/profiler/rpc/client/capture_profile.h"
#include "tensorflow/core/profiler/rpc/client/save_profile.h"
#include "tensorflow/core/profiler/rpc/profiler_server.h"
#include "tensorflow/python/lib/core/pybind11_status.h"
namespace py = ::pybind11;
namespace {
using ::tensorflow::RemoteProfilerSessionManagerOptions;
// Profiler gives grace after profiling duration to terminate.
constexpr absl::Duration kMinSessionGraceTime = absl::Seconds(60);
tensorflow::Status ValidateHostPortPair(absl::string_view host_port) {
tensorflow::uint32 port;
std::vector<absl::string_view> parts = absl::StrSplit(host_port, ':');
// Must be host:port, port must be a number, host must not contain a '/',
// host also must not be empty.
if (parts.size() != 2 || !absl::SimpleAtoi(parts[1], &port) ||
absl::StrContains(parts[0], "/") || parts[0].empty()) {
return tensorflow::errors::InvalidArgument(
"Could not interpret \"", host_port, "\" as a host-port pair.");
}
return tensorflow::Status::OK();
}
tensorflow::Status ValidateOptions(
const RemoteProfilerSessionManagerOptions& options) {
if (options.service_addresses().empty()) {
return tensorflow::errors::InvalidArgument("No service address provided.");
}
if (options.profiler_options().duration_ms() == 0) {
return tensorflow::errors::InvalidArgument(
"duration_ms must be greater than zero.");
}
for (absl::string_view host_port : options.service_addresses()) {
TF_RETURN_IF_ERROR(ValidateHostPortPair(host_port));
}
if (options.max_session_duration_ms() <
options.profiler_options().duration_ms()) {
return tensorflow::errors::InvalidArgument(
"The maximum profiling session duration must be greater than or equal "
"to the local profiler duration.");
}
return tensorflow::Status::OK();
}
// Receives a comma delimited list of service_addresses and adds them to
// RemoteProfilerSessionManagerOptions::service_addresses.
void AddServiceAddresses(absl::string_view service_addresses,
RemoteProfilerSessionManagerOptions* options) {
for (absl::string_view server : absl::StrSplit(service_addresses, ',')) {
options->add_service_addresses(server.data(), server.size());
}
}
// Sets gRPC deadline to a grace period based on the profiling duration.
void UpdateMaxSessionDuration(RemoteProfilerSessionManagerOptions& options) {
auto local_profiler_duration = options.profiler_options().duration_ms();
auto session_creation_ts = options.session_creation_timestamp_ns();
auto requested_start_ts = options.profiler_options().start_timestamp_ns();
// User only needs to set maximal session duration if the profiling duration
// is bounded.
DCHECK_GT(local_profiler_duration, 0);
VLOG(3) << "duration_ms was given as " << local_profiler_duration;
// Max session duration is the profiling session with grace time.
auto profile_duration = std::max(
kMinSessionGraceTime, absl::Milliseconds(local_profiler_duration) * 2);
absl::Duration delay_duration;
// When requested start timestamp is 0, profiling starts immediately.
if (requested_start_ts > 0) {
delay_duration =
absl::Nanoseconds(requested_start_ts - session_creation_ts);
}
auto max_session_duration = profile_duration + delay_duration;
options.set_max_session_duration_ms(
absl::ToInt64Milliseconds(max_session_duration));
VLOG(1) << "max_session_duration set to " << max_session_duration;
}
// Takes profiler options in a py::dict and returns a
// RemoteProfilerSessionManagerOptions.
// This must be called under GIL because it reads Python objects. Reading Python
// objects require GIL because the objects can be mutated by other Python
// threads. In addition, Python objects are reference counted; reading py::dict
// will increase its reference count.
RemoteProfilerSessionManagerOptions GetOptionsLocked(absl::string_view logdir,
const py::dict& opts) {
RemoteProfilerSessionManagerOptions options;
*options.mutable_profiler_options() =
tensorflow::ProfilerSession::DefaultOptions();
// Store a timestamp of when this session was created. This will be the basis
// of gRPC deadline afterwards.
auto now = absl::Now();
options.set_session_creation_timestamp_ns(absl::ToUnixNanos(now));
VLOG(2) << "set_session_creation_timestamp_ns set to "
<< options.session_creation_timestamp_ns() << " [" << now << "]";
// Set the path of where to store XSpaces.
options.mutable_profiler_options()->set_repository_path(logdir.data(),
logdir.size());
VLOG(2) << "repository_path set to "
<< options.profiler_options().repository_path();
for (const auto& kw : opts) {
std::string key = py::cast<std::string>(kw.first);
if (key == "host_tracer_level") {
auto value = py::cast<int>(kw.second);
options.mutable_profiler_options()->set_host_tracer_level(value);
VLOG(1) << "host_tracer_level set to " << value;
} else if (key == "device_tracer_level") {
auto value = py::cast<int>(kw.second);
options.mutable_profiler_options()->set_device_tracer_level(value);
VLOG(1) << "device_tracer_level set to " << value;
} else if (key == "python_tracer_level") {
auto value = py::cast<int>(kw.second);
options.mutable_profiler_options()->set_python_tracer_level(value);
VLOG(1) << "python_tracer_level set to " << value;
} else if (key == "delay_ms") {
if (!kw.second.is_none()) {
auto value = py::cast<int>(kw.second);
options.set_delay_ms(value);
VLOG(1) << "delay_ms was set to " << value;
}
} else {
LOG(WARNING) << "Unrecognised key: " << key;
}
}
return options;
}
RemoteProfilerSessionManagerOptions GetOptionsLocked(
absl::string_view service_addresses, absl::string_view logdir,
absl::string_view worker_list, bool include_dataset_ops,
tensorflow::int32 duration_ms, py::dict opts, bool* is_cloud_tpu_session) {
RemoteProfilerSessionManagerOptions options = GetOptionsLocked(logdir, opts);
// Remote profiling does not support any use cases where the following options
// are set by `py::dict opts`. e.g. `opts['service_addrs']` will not happen.
DCHECK(options.service_addresses().empty());
// In remote profiling, duration is always passed by value explicitly and not
// set in py::dict opts.
DCHECK_EQ(options.profiler_options().duration_ms(), 0);
// Because duration_ms is not set from py::dict opts, it follows that
// max_session_duration_ms must be unset as well.
DCHECK_EQ(options.max_session_duration_ms(), 0);
// Worker_list is only used for TensorBoard TPU capture cases. For a TPU
// cluster, service_address is the Master, which can already be found in the
// list of workers. These sessions will be used with the ProfileAnalysis
// service.
*is_cloud_tpu_session = !worker_list.empty();
AddServiceAddresses(*is_cloud_tpu_session ? worker_list : service_addresses,
&options);
// Set local profiler duration and profiler session durations.
options.mutable_profiler_options()->set_include_dataset_ops(
include_dataset_ops);
options.mutable_profiler_options()->set_duration_ms(duration_ms);
UpdateMaxSessionDuration(options);
for (int idx = 0; idx < options.service_addresses_size(); ++idx) {
VLOG(1) << "service_addr " << idx << " set to "
<< options.service_addresses(idx);
}
VLOG(1) << "include_dataset_ops set to " << include_dataset_ops;
VLOG(1) << "duration_ms set to " << duration_ms;
return options;
}
class ProfilerSessionWrapper {
public:
void Start(const char* logdir, const py::dict& options) {
auto opts = GetOptionsLocked(logdir, options);
session_ = tensorflow::ProfilerSession::Create(opts.profiler_options());
logdir_ = logdir;
tensorflow::MaybeRaiseRegisteredFromStatus(session_->Status());
}
py::bytes Stop() {
tensorflow::string content;
if (session_ != nullptr) {
tensorflow::profiler::XSpace xspace;
tensorflow::Status status = session_->CollectData(&xspace);
session_.reset();
tensorflow::profiler::ConvertXSpaceToTraceEventsString(xspace, &content);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
}
// The content is not valid UTF-8, so it must be converted to bytes.
return py::bytes(content);
}
void ExportToTensorBoard() {
if (!session_ || logdir_.empty()) return;
tensorflow::profiler::XSpace xspace;
tensorflow::Status status;
status = session_->CollectData(&xspace);
xspace.add_hostnames(tensorflow::port::Hostname());
session_.reset();
status = tensorflow::profiler::ExportToTensorBoard(xspace, logdir_);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
}
private:
std::unique_ptr<tensorflow::ProfilerSession> session_;
tensorflow::string logdir_;
};
} // namespace
PYBIND11_MODULE(_pywrap_profiler, m) {
py::class_<ProfilerSessionWrapper> profiler_session_class(m,
"ProfilerSession");
profiler_session_class.def(py::init<>())
.def("start", &ProfilerSessionWrapper::Start)
.def("stop", &ProfilerSessionWrapper::Stop)
.def("export_to_tb", &ProfilerSessionWrapper::ExportToTensorBoard);
m.def("start_server", [](int port) {
auto profiler_server =
absl::make_unique<tensorflow::profiler::ProfilerServer>();
profiler_server->StartProfilerServer(port);
// Intentionally release profiler server. Should transfer ownership to
// caller instead.
profiler_server.release();
});
m.def("trace", [](const char* service_addr, const char* logdir,
const char* worker_list, bool include_dataset_ops,
int duration_ms, int num_tracing_attempts,
py::dict options) {
// TPU capture is true if the user sets worker_list.
bool is_cloud_tpu_session = false;
// Normalize py::dict into a well defined and validated proto.
tensorflow::RemoteProfilerSessionManagerOptions opts =
GetOptionsLocked(service_addr, logdir, worker_list, include_dataset_ops,
duration_ms, options, &is_cloud_tpu_session);
tensorflow::Status status = ValidateOptions(opts);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
{
// Release the lock to keep the lock scope to a minimum, and allow
// other threads to proceed.
py::gil_scoped_release release;
status = tensorflow::profiler::Trace(logdir, num_tracing_attempts, opts,
is_cloud_tpu_session);
}
tensorflow::MaybeRaiseRegisteredFromStatus(status);
});
m.def("monitor", [](const char* service_addr, int duration_ms,
int monitoring_level, bool display_timestamp) {
tensorflow::Status status = ValidateHostPortPair(service_addr);
tensorflow::MaybeRaiseRegisteredFromStatus(status);
tensorflow::string content;
{
// Release the lock to keep the lock scope to a minimum, and allow
// other threads to proceed.
py::gil_scoped_release release;
status = tensorflow::profiler::Monitor(service_addr, duration_ms,
monitoring_level,
display_timestamp, &content);
}
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return content;
});
m.def("xspace_to_tools_data",
[](const py::list& xspace_path_list, const py::str& py_tool_name) {
std::vector<std::string> xspace_paths;
for (py::handle obj : xspace_path_list) {
xspace_paths.push_back(std::string(py::cast<py::str>(obj)));
}
std::string tool_name = std::string(py_tool_name);
auto tool_data_and_success =
tensorflow::profiler::ConvertMultiXSpacesToToolData(xspace_paths,
tool_name);
return py::make_tuple(py::bytes(tool_data_and_success.first),
py::bool_(tool_data_and_success.second));
});
};