Save gzipped trace events json by default.
PiperOrigin-RevId: 298465129 Change-Id: I79bb7265d3e4ded84effd3218ecf62716df997c6
This commit is contained in:
parent
74598809cd
commit
6f19495b9b
@ -229,6 +229,7 @@ cc_library(
|
||||
":op_stats_to_input_pipeline_analysis",
|
||||
":op_stats_to_overview_page",
|
||||
":op_stats_to_tf_stats",
|
||||
":trace_events_to_json",
|
||||
":xplane_to_op_stats",
|
||||
":xplane_to_trace_events",
|
||||
"//tensorflow/core:lib",
|
||||
@ -240,6 +241,7 @@ cc_library(
|
||||
"//tensorflow/core/profiler/protobuf:overview_page_proto_cc",
|
||||
"//tensorflow/core/profiler/protobuf:tf_stats_proto_cc",
|
||||
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
||||
"//tensorflow/core/profiler/rpc/client:save_profile",
|
||||
"@com_google_absl//absl/container:flat_hash_set",
|
||||
"@com_google_absl//absl/strings",
|
||||
],
|
||||
|
@ -20,6 +20,7 @@ limitations under the License.
|
||||
#include "tensorflow/core/profiler/convert/op_stats_to_input_pipeline_analysis.h"
|
||||
#include "tensorflow/core/profiler/convert/op_stats_to_overview_page.h"
|
||||
#include "tensorflow/core/profiler/convert/op_stats_to_tf_stats.h"
|
||||
#include "tensorflow/core/profiler/convert/trace_events_to_json.h"
|
||||
#include "tensorflow/core/profiler/convert/xplane_to_op_stats.h"
|
||||
#include "tensorflow/core/profiler/convert/xplane_to_trace_events.h"
|
||||
#include "tensorflow/core/profiler/profiler_service.pb.h"
|
||||
@ -30,6 +31,7 @@ limitations under the License.
|
||||
#include "tensorflow/core/profiler/protobuf/overview_page.pb.h"
|
||||
#include "tensorflow/core/profiler/protobuf/tf_stats.pb.h"
|
||||
#include "tensorflow/core/profiler/protobuf/xplane.pb.h"
|
||||
#include "tensorflow/core/profiler/rpc/client/save_profile.h"
|
||||
|
||||
namespace tensorflow {
|
||||
namespace profiler {
|
||||
@ -57,24 +59,30 @@ void AddToolData(absl::string_view tool_name, const Proto& tool_output,
|
||||
|
||||
// Returns the tool name with extension.
|
||||
string ToolName(absl::string_view tool) {
|
||||
if (tool == kTraceViewer) return "trace";
|
||||
if (tool == kTraceViewer) return "trace.json.gz";
|
||||
return absl::StrCat(tool, ".pb");
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void ConvertXSpaceToProfileResponse(const XSpace& xspace,
|
||||
const ProfileRequest& req,
|
||||
ProfileResponse* response) {
|
||||
Status ConvertXSpaceToProfileResponse(const XSpace& xspace,
|
||||
const ProfileRequest& req,
|
||||
ProfileResponse* response) {
|
||||
absl::flat_hash_set<absl::string_view> tools(req.tools().begin(),
|
||||
req.tools().end());
|
||||
if (tools.empty()) return;
|
||||
if (tools.empty()) return Status::OK();
|
||||
if (tools.contains(kTraceViewer)) {
|
||||
Trace trace;
|
||||
ConvertXSpaceToTraceEvents(xspace, &trace);
|
||||
AddToolData(ToolName(kTraceViewer), trace, response);
|
||||
if (trace.trace_events().empty()) {
|
||||
response->set_empty_trace(true);
|
||||
return Status::OK();
|
||||
}
|
||||
TF_RETURN_IF_ERROR(SaveGzippedToolDataToTensorboardProfile(
|
||||
req.repository_root(), req.session_id(), req.host_name(),
|
||||
ToolName(kTraceViewer), TraceEventsToJson(trace)));
|
||||
// Trace viewer is the only tool, skip OpStats conversion.
|
||||
if (tools.size() == 1) return;
|
||||
if (tools.size() == 1) return Status::OK();
|
||||
}
|
||||
OpStats op_stats = ConvertXSpaceToOpStats(xspace);
|
||||
HardwareType hw_type =
|
||||
@ -99,6 +107,7 @@ void ConvertXSpaceToProfileResponse(const XSpace& xspace,
|
||||
if (tools.contains(kKernelStats)) {
|
||||
AddToolData(ToolName(kKernelStats), op_stats.kernel_stats_db(), response);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace profiler
|
||||
|
@ -27,10 +27,11 @@ namespace profiler {
|
||||
// Convert collected trace in XSpace format to tools data based on the
|
||||
// specified list of tools, and save to ProfileResponse.
|
||||
// The accepted tools are:
|
||||
// "overview_page", "input_pipeline" and "tensorflow_stats".
|
||||
void ConvertXSpaceToProfileResponse(const XSpace& xspace,
|
||||
const ProfileRequest& req,
|
||||
ProfileResponse* response);
|
||||
// "overview_page", "input_pipeline", "tensorflow_stats", "kernel_stats"
|
||||
// and "trace_viewer".
|
||||
Status ConvertXSpaceToProfileResponse(const XSpace& xspace,
|
||||
const ProfileRequest& req,
|
||||
ProfileResponse* response);
|
||||
|
||||
} // namespace profiler
|
||||
} // namespace tensorflow
|
||||
|
@ -66,7 +66,7 @@ TEST(ConvertXPlaneToProfileResponse, TraceViewer) {
|
||||
CreateXSpace(&xspace);
|
||||
ProfileRequest request;
|
||||
ProfileResponse response;
|
||||
ConvertXSpaceToProfileResponse(xspace, request, &response);
|
||||
TF_CHECK_OK(ConvertXSpaceToProfileResponse(xspace, request, &response));
|
||||
}
|
||||
|
||||
TEST(ConvertXPlaneToProfileResponse, OverviewPage) {
|
||||
@ -75,7 +75,7 @@ TEST(ConvertXPlaneToProfileResponse, OverviewPage) {
|
||||
ProfileRequest request;
|
||||
request.add_tools("overview_page");
|
||||
ProfileResponse response;
|
||||
ConvertXSpaceToProfileResponse(xspace, request, &response);
|
||||
TF_CHECK_OK(ConvertXSpaceToProfileResponse(xspace, request, &response));
|
||||
EXPECT_EQ(1, response.tool_data_size());
|
||||
EXPECT_EQ("overview_page.pb", response.tool_data(/*index=*/0).name());
|
||||
OverviewPage overview_page;
|
||||
@ -89,7 +89,7 @@ TEST(ConvertXPlaneToProfileResponse, InputPipeline) {
|
||||
ProfileRequest request;
|
||||
request.add_tools("input_pipeline");
|
||||
ProfileResponse response;
|
||||
ConvertXSpaceToProfileResponse(xspace, request, &response);
|
||||
TF_CHECK_OK(ConvertXSpaceToProfileResponse(xspace, request, &response));
|
||||
EXPECT_EQ(1, response.tool_data_size());
|
||||
EXPECT_EQ("input_pipeline.pb", response.tool_data(/*index=*/0).name());
|
||||
InputPipelineAnalysisResult input_pipeline;
|
||||
@ -103,7 +103,7 @@ TEST(ConvertXPlaneToProfileResponse, TensorflowStats) {
|
||||
ProfileRequest request;
|
||||
request.add_tools("tensorflow_stats");
|
||||
ProfileResponse response;
|
||||
ConvertXSpaceToProfileResponse(xspace, request, &response);
|
||||
TF_CHECK_OK(ConvertXSpaceToProfileResponse(xspace, request, &response));
|
||||
EXPECT_EQ(1, response.tool_data_size());
|
||||
EXPECT_EQ("tensorflow_stats.pb", response.tool_data(/*index=*/0).name());
|
||||
TfStatsDatabase tf_stats_db;
|
||||
|
@ -52,12 +52,8 @@ ProfileRequest PopulateProfileRequest(int duration_ms,
|
||||
ProfileRequest request;
|
||||
request.set_duration_ms(duration_ms);
|
||||
request.set_max_events(kMaxEvents);
|
||||
if (absl::StartsWith(repository_root, "gs://")) {
|
||||
// For backward compatibilities, only generate tracetable etc when the
|
||||
// user provide a GCS path for model directory.
|
||||
request.set_repository_root(repository_root);
|
||||
request.set_session_id(session_id);
|
||||
}
|
||||
request.set_repository_root(repository_root);
|
||||
request.set_session_id(session_id);
|
||||
request.add_tools("trace_viewer");
|
||||
request.add_tools("op_profile");
|
||||
request.add_tools("input_pipeline");
|
||||
@ -94,11 +90,12 @@ Status Profile(const string& service_addr, const string& logdir,
|
||||
const ProfileOptions& opts) {
|
||||
ProfileRequest request =
|
||||
PopulateProfileRequest(duration_ms, logdir, session_id, opts);
|
||||
std::vector<string> parts = absl::StrSplit(service_addr, ':');
|
||||
request.set_host_name(parts[0]);
|
||||
|
||||
::grpc::ClientContext context;
|
||||
::grpc::ChannelArguments channel_args;
|
||||
// TODO(qiuminxu): use `NewHostPortGrpcChannel` instead once their
|
||||
// `ValidateHostPortPair` checks for empty host string case.
|
||||
channel_args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH,
|
||||
std::numeric_limits<int32>::max());
|
||||
std::unique_ptr<grpc::ProfilerService::Stub> stub =
|
||||
@ -110,8 +107,8 @@ Status Profile(const string& service_addr, const string& logdir,
|
||||
FromGrpcStatus(stub->Profile(&context, request, &response)));
|
||||
|
||||
if (!response.empty_trace()) {
|
||||
TF_CHECK_OK(
|
||||
SaveTensorboardProfile(logdir, session_id, "", response, &std::cout));
|
||||
TF_CHECK_OK(SaveTensorboardProfile(logdir, session_id, request.host_name(),
|
||||
response, &std::cout));
|
||||
// Print this at the end so that it's not buried in irrelevant LOG messages.
|
||||
std::cout
|
||||
<< "NOTE: using the trace duration " << duration_ms << "ms.\n"
|
||||
@ -145,7 +142,6 @@ Status NewSession(const string& service_addr, const string& repository_root,
|
||||
::grpc::ClientContext context;
|
||||
::grpc::ChannelArguments channel_args;
|
||||
// TODO(qiuminxu): use `NewHostPortGrpcChannel` instead once their
|
||||
// `ValidateHostPortPair` checks for empty host string case.
|
||||
channel_args.SetMaxReceiveMessageSize(std::numeric_limits<int32>::max());
|
||||
// TODO(jiesun): GRPC support following relevant naming scheme:
|
||||
// 1. dns:///host:port
|
||||
|
@ -22,6 +22,7 @@ limitations under the License.
|
||||
#include "absl/strings/match.h"
|
||||
#include "absl/strings/str_cat.h"
|
||||
#include "tensorflow/core/lib/core/errors.h"
|
||||
#include "tensorflow/core/lib/io/compression.h"
|
||||
#include "tensorflow/core/lib/io/path.h"
|
||||
#include "tensorflow/core/platform/env.h"
|
||||
#include "tensorflow/core/platform/protobuf.h"
|
||||
@ -40,12 +41,12 @@ using ::tensorflow::io::JoinPath;
|
||||
constexpr char kProtoTraceFileName[] = "trace";
|
||||
constexpr char kTfStatsHelperSuffix[] = "tf_stats_helper_result";
|
||||
|
||||
Status DumpToolDataToLogDirectory(StringPiece run_dir,
|
||||
const string& host_prefix,
|
||||
Status DumpToolDataToLogDirectory(StringPiece run_dir, const string& host,
|
||||
const ProfileToolData& tool,
|
||||
std::ostream* os) {
|
||||
// Don't save the intermediate results for combining the per host tool data.
|
||||
if (absl::EndsWith(tool.name(), kTfStatsHelperSuffix)) return Status::OK();
|
||||
string host_prefix = host.empty() ? "" : absl::StrCat(host, ".");
|
||||
string path = JoinPath(run_dir, absl::StrCat(host_prefix, tool.name()));
|
||||
TF_RETURN_IF_ERROR(WriteStringToFile(Env::Default(), path, tool.data()));
|
||||
if (os) {
|
||||
@ -72,6 +73,32 @@ Status MaybeCreateEmptyEventFile(const string& logdir) {
|
||||
return event_writer.InitWithSuffix(kProfileEmptySuffix);
|
||||
}
|
||||
|
||||
Status WriteGzippedDataToFile(const string& filepath, const string& data) {
|
||||
std::unique_ptr<WritableFile> file;
|
||||
TF_RETURN_IF_ERROR(Env::Default()->NewWritableFile(filepath, &file));
|
||||
io::ZlibCompressionOptions options = io::ZlibCompressionOptions::GZIP();
|
||||
io::ZlibOutputBuffer buffer(file.get(), options.input_buffer_size,
|
||||
options.output_buffer_size, options);
|
||||
TF_RETURN_IF_ERROR(buffer.Init());
|
||||
TF_RETURN_IF_ERROR(buffer.Append(data));
|
||||
TF_RETURN_IF_ERROR(buffer.Close());
|
||||
TF_RETURN_IF_ERROR(file->Close());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status GetOrCreateProfileRunDir(const string& logdir, const string& run,
|
||||
string* profile_run_dir, std::ostream* os) {
|
||||
// Dumps profile data to <logdir>/plugins/profile/<run>/.
|
||||
*profile_run_dir = JoinPath(GetTensorBoardProfilePluginDir(logdir), run);
|
||||
*os << "Creating directory: " << *profile_run_dir;
|
||||
TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(*profile_run_dir));
|
||||
|
||||
// Creates an empty event file so that TensorBoard plugin logic can find
|
||||
// the logdir.
|
||||
TF_RETURN_IF_ERROR(MaybeCreateEmptyEventFile(logdir));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
string GetTensorBoardProfilePluginDir(const string& logdir) {
|
||||
@ -84,22 +111,32 @@ Status SaveTensorboardProfile(const string& logdir, const string& run,
|
||||
const string& host,
|
||||
const ProfileResponse& response,
|
||||
std::ostream* os) {
|
||||
// Dumps profile data to <logdir>/plugins/profile/<run>/.
|
||||
string host_prefix = host.empty() ? "" : absl::StrCat(host, ".");
|
||||
string profile_run_dir =
|
||||
JoinPath(GetTensorBoardProfilePluginDir(logdir), run);
|
||||
*os << "Creating directory: " << profile_run_dir;
|
||||
TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(profile_run_dir));
|
||||
|
||||
// Creates an empty event file so that TensorBoard plugin logic can find
|
||||
// the logdir.
|
||||
TF_RETURN_IF_ERROR(MaybeCreateEmptyEventFile(logdir));
|
||||
string profile_run_dir;
|
||||
TF_RETURN_IF_ERROR(
|
||||
GetOrCreateProfileRunDir(logdir, run, &profile_run_dir, os));
|
||||
for (const auto& tool_data : response.tool_data()) {
|
||||
TF_RETURN_IF_ERROR(DumpToolDataToLogDirectory(profile_run_dir, host_prefix,
|
||||
tool_data, os));
|
||||
TF_RETURN_IF_ERROR(
|
||||
DumpToolDataToLogDirectory(profile_run_dir, host, tool_data, os));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SaveGzippedToolDataToTensorboardProfile(const string& logdir,
|
||||
const string& run,
|
||||
const string& host,
|
||||
const string& tool_name,
|
||||
const string& data) {
|
||||
string profile_run_dir;
|
||||
std::stringstream ss;
|
||||
Status status = GetOrCreateProfileRunDir(logdir, run, &profile_run_dir, &ss);
|
||||
LOG(INFO) << ss.str();
|
||||
TF_RETURN_IF_ERROR(status);
|
||||
string host_prefix = host.empty() ? "" : absl::StrCat(host, ".");
|
||||
string path = JoinPath(profile_run_dir, absl::StrCat(host_prefix, tool_name));
|
||||
TF_RETURN_IF_ERROR(WriteGzippedDataToFile(path, data));
|
||||
LOG(INFO) << "Dumped gzipped tool data for " << tool_name << " to " << path;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace profiler
|
||||
} // namespace tensorflow
|
||||
|
@ -34,6 +34,13 @@ Status SaveTensorboardProfile(const string& logdir, const string& run,
|
||||
const ProfileResponse& response,
|
||||
std::ostream* os);
|
||||
|
||||
// Gzip the data and save to the specified filepath.
|
||||
Status SaveGzippedToolDataToTensorboardProfile(const string& logdir,
|
||||
const string& run,
|
||||
const string& host,
|
||||
const string& tool_name,
|
||||
const string& data);
|
||||
|
||||
} // namespace profiler
|
||||
} // namespace tensorflow
|
||||
|
||||
|
@ -36,7 +36,8 @@ Status CollectDataToResponse(const ProfileRequest& req,
|
||||
ProfileResponse* response) {
|
||||
profiler::XSpace xspace;
|
||||
TF_RETURN_IF_ERROR(profiler->CollectData(&xspace));
|
||||
profiler::ConvertXSpaceToProfileResponse(xspace, req, response);
|
||||
TF_RETURN_IF_ERROR(
|
||||
profiler::ConvertXSpaceToProfileResponse(xspace, req, response));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -1829,7 +1829,7 @@ class TestTensorBoardV2NonParameterizedTest(keras_parameterized.TestCase):
|
||||
for (dirpath, dirnames, filenames) in os.walk(profile_dir):
|
||||
del dirnames # unused
|
||||
for filename in filenames:
|
||||
if filename.endswith('.trace'):
|
||||
if filename.endswith('.trace.json.gz'):
|
||||
return os.path.join(dirpath, filename)
|
||||
return None
|
||||
|
||||
|
@ -36,13 +36,18 @@ tensorflow::string GetCurrentTimeStampAsString() {
|
||||
absl::LocalTimeZone());
|
||||
}
|
||||
|
||||
tensorflow::ProfileRequest MakeProfileRequest() {
|
||||
tensorflow::ProfileRequest MakeProfileRequest(
|
||||
const tensorflow::string& logdir, const tensorflow::string& session_id,
|
||||
const tensorflow::string& host) {
|
||||
tensorflow::ProfileRequest request;
|
||||
request.add_tools("trace_viewer");
|
||||
request.add_tools("overview_page");
|
||||
request.add_tools("input_pipeline");
|
||||
request.add_tools("kernel_stats");
|
||||
request.add_tools("tensorflow_stats");
|
||||
request.set_host_name(host);
|
||||
request.set_repository_root(logdir);
|
||||
request.set_session_id(session_id);
|
||||
return request;
|
||||
}
|
||||
|
||||
@ -76,15 +81,17 @@ class ProfilerSessionWrapper {
|
||||
return;
|
||||
}
|
||||
tensorflow::ProfileResponse response;
|
||||
tensorflow::profiler::ConvertXSpaceToProfileResponse(
|
||||
xspace, MakeProfileRequest(), &response);
|
||||
tensorflow::ProfileRequest request = MakeProfileRequest(
|
||||
logdir_, GetCurrentTimeStampAsString(), tensorflow::port::Hostname());
|
||||
tensorflow::profiler::ConvertXSpaceToProfileResponse(xspace, request,
|
||||
&response);
|
||||
|
||||
std::stringstream ss; // Record LOG messages.
|
||||
status = tensorflow::profiler::SaveTensorboardProfile(
|
||||
logdir_, GetCurrentTimeStampAsString(), tensorflow::port::Hostname(),
|
||||
request.repository_root(), request.session_id(), request.host_name(),
|
||||
response, &ss);
|
||||
LOG(INFO) << ss.str();
|
||||
tensorflow::MaybeRaiseRegisteredFromStatus(tensorflow::Status::OK());
|
||||
tensorflow::MaybeRaiseRegisteredFromStatus(status);
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -21,10 +21,7 @@ from __future__ import print_function
|
||||
import os
|
||||
import socket
|
||||
|
||||
from tensorflow.core.protobuf import trace_events_pb2
|
||||
from tensorflow.python.eager import profiler
|
||||
from tensorflow.python.eager import test
|
||||
from tensorflow.python.framework import config
|
||||
from tensorflow.python.framework import constant_op
|
||||
from tensorflow.python.framework import errors
|
||||
from tensorflow.python.framework import test_util
|
||||
@ -74,19 +71,10 @@ class ProfilerTest(test_util.TensorFlowTestCase):
|
||||
tensorflow_stats = os.path.join(profile_dir, run,
|
||||
hostname + '.tensorflow_stats.pb')
|
||||
self.assertTrue(gfile.Exists(tensorflow_stats))
|
||||
|
||||
trace_file = os.path.join(profile_dir, run, hostname + '.trace')
|
||||
kernel_stats = os.path.join(profile_dir, run, hostname + '.kernel_stats.pb')
|
||||
self.assertTrue(gfile.Exists(kernel_stats))
|
||||
trace_file = os.path.join(profile_dir, run, hostname + '.trace.json.gz')
|
||||
self.assertTrue(gfile.Exists(trace_file))
|
||||
with gfile.Open(trace_file, 'rb') as f:
|
||||
profile_pb = trace_events_pb2.Trace()
|
||||
profile_pb.ParseFromString(f.read())
|
||||
devices = frozenset(device.name for device in profile_pb.devices.values())
|
||||
self.assertIn('/host:CPU', devices)
|
||||
if config.list_physical_devices('GPU'):
|
||||
self.assertIn('/device:GPU:0', devices)
|
||||
events = frozenset(event.name for event in profile_pb.trace_events)
|
||||
self.assertIn('three_times_five', events)
|
||||
self.assertIn('Mul:Mul', events)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Loading…
Reference in New Issue
Block a user