Improve the handling of remote worker threads. Also, sort root events before grouping.
PiperOrigin-RevId: 316133384 Change-Id: Ie8272331135b1d14d52e66a9933e3d7037ac2eb6
This commit is contained in:
parent
eb4af4527b
commit
795362accd
@ -285,6 +285,7 @@ cc_library(
|
||||
"//tensorflow/core:lib_internal",
|
||||
"//tensorflow/core/profiler/lib:connected_traceme",
|
||||
"//tensorflow/core/profiler/protobuf:xplane_proto_cc",
|
||||
"@com_google_absl//absl/algorithm:container",
|
||||
"@com_google_absl//absl/container:flat_hash_map",
|
||||
"@com_google_absl//absl/container:flat_hash_set",
|
||||
"@com_google_absl//absl/strings",
|
||||
|
@ -24,6 +24,7 @@ limitations under the License.
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "absl/container/flat_hash_map.h"
|
||||
#include "absl/container/flat_hash_set.h"
|
||||
#include "absl/strings/str_cat.h"
|
||||
@ -128,13 +129,6 @@ std::unique_ptr<XEvent> CreateVirtualEvent(const XStat& step_id_stat,
|
||||
return virtual_event;
|
||||
}
|
||||
|
||||
bool NeedsVirtualEventsForAsyncExecutor(
|
||||
const std::vector<int64 /*EventType*/>& root_event_types) {
|
||||
return std::find(root_event_types.begin(), root_event_types.end(),
|
||||
HostEventType::kAsyncExecutorTraceContext) !=
|
||||
root_event_types.end();
|
||||
}
|
||||
|
||||
bool HasFunctionRun(EventNode* event_node) {
|
||||
for (EventNode* child : event_node->GetChildren()) {
|
||||
if (child->GetEventVisitor().Type() == HostEventType::kFunctionRun) {
|
||||
@ -254,6 +248,13 @@ bool IsTopRoot(const EventNode* event) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void SortEventList(EventList* event_list) {
|
||||
absl::c_sort(*event_list, [](const EventNode* e1, const EventNode* e2) {
|
||||
return e1->GetEventVisitor().TimestampPs() <
|
||||
e2->GetEventVisitor().TimestampPs();
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
EventNode::EventNode(const XPlaneVisitor* plane, XLine* raw_line,
|
||||
@ -488,6 +489,7 @@ void EventForest::CreateEventGroup() {
|
||||
return;
|
||||
}
|
||||
|
||||
SortEventList(&root_events_);
|
||||
for (EventNode* root_event : root_events_) {
|
||||
if (IsTopRoot(root_event)) {
|
||||
ProcessRootEvent(next_group_id_++, root_event, &event_group_name_map_);
|
||||
@ -579,23 +581,21 @@ void EventForest::ProcessTensorFlowLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
void EventForest::CreateVirtualEventsForAsyncExecutor() {
|
||||
auto eager_kernel_execute_event_node_list =
|
||||
void EventForest::ProcessWorker() {
|
||||
auto eager_kernel_execute_event_list =
|
||||
gtl::FindOrNull(event_node_map_, HostEventType::kEagerKernelExecute);
|
||||
if (!eager_kernel_execute_event_node_list) return;
|
||||
EventNode* virtual_event_node = nullptr;
|
||||
for (auto& eager_kernel_execute_event_node :
|
||||
*eager_kernel_execute_event_node_list) {
|
||||
if (HasFunctionRun(eager_kernel_execute_event_node.get())) {
|
||||
auto new_virtual_event_node =
|
||||
absl::make_unique<EventNode>(*eager_kernel_execute_event_node);
|
||||
virtual_event_node = new_virtual_event_node.get();
|
||||
// event_node_map_ keeps new_virtual_event_node alive.
|
||||
event_node_map_[HostEventType::kAsyncExecutorTraceContext].push_back(
|
||||
std::move(new_virtual_event_node));
|
||||
}
|
||||
if (virtual_event_node) {
|
||||
virtual_event_node->AddChild(eager_kernel_execute_event_node.get());
|
||||
if (!eager_kernel_execute_event_list) return;
|
||||
// The last EagerKernelExecute with a FunctionRun child.
|
||||
EventNode* root_event = nullptr;
|
||||
for (auto& eager_kernel_execute_event : *eager_kernel_execute_event_list) {
|
||||
if (HasFunctionRun(eager_kernel_execute_event.get())) {
|
||||
// A function op becomes a new root.
|
||||
root_event = eager_kernel_execute_event.get();
|
||||
root_event->SetIsRoot(true);
|
||||
root_events_.push_back(root_event);
|
||||
} else if (root_event) {
|
||||
// Add non-function eager ops as child.
|
||||
root_event->AddChild(eager_kernel_execute_event.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -615,9 +615,7 @@ EventForest::EventForest(
|
||||
ConnectInterThread(connect_info_list);
|
||||
ConnectContextGroups(context_groups);
|
||||
ProcessTensorFlowLoop();
|
||||
if (NeedsVirtualEventsForAsyncExecutor(root_event_types)) {
|
||||
CreateVirtualEventsForAsyncExecutor();
|
||||
}
|
||||
ProcessWorker();
|
||||
ProcessLegacyRootEvents(root_event_types);
|
||||
CreateEventGroup();
|
||||
MarkEagerlyExecutedGpuKernels();
|
||||
|
@ -186,10 +186,9 @@ class EventForest {
|
||||
// iteraton to `tf_loop_root_events_`.
|
||||
void ProcessTensorFlowLoop();
|
||||
|
||||
// Creates virtual events of HostEventType::kAsyncExecutorTraceContext. A
|
||||
// virtual event is created for every FunctionRun and the following eager ops
|
||||
// (e.g., for Keras callback).
|
||||
void CreateVirtualEventsForAsyncExecutor();
|
||||
// Processes the worker thread by grouping a FunctionRun with the following
|
||||
// eager ops (e.g., for Keras callback).
|
||||
void ProcessWorker();
|
||||
|
||||
EventNodeMap event_node_map_;
|
||||
std::vector<XPlaneVisitor> visitors_;
|
||||
|
@ -470,6 +470,63 @@ TEST(GroupEventsTest, AsyncEventTest) {
|
||||
});
|
||||
}
|
||||
|
||||
TEST(GroupEventsTest, WorkerTest) {
|
||||
constexpr uint64 kEagerKernelExecuteDuration = 100;
|
||||
constexpr uint64 kFunctionRunDuration = 50;
|
||||
constexpr uint64 kFirstEagerKernelExecuteStartTime = 0;
|
||||
constexpr uint64 kSecondEagerKernelExecuteStartTime = 200;
|
||||
constexpr uint64 kThirdEagerKernelExecuteStartTime = 400;
|
||||
constexpr uint64 kFourthEagerKernelExecuteStartTime = 600;
|
||||
constexpr uint64 kFirstFunctionRunStartTime = 210;
|
||||
constexpr uint64 kSecondFunctionRunStartTime = 610;
|
||||
|
||||
XSpace raw_space;
|
||||
XPlane* raw_plane = raw_space.add_planes();
|
||||
XPlaneBuilder plane(raw_plane);
|
||||
plane.ReserveLines(1);
|
||||
auto line = plane.GetOrCreateLine(0);
|
||||
// Eager op. It doesn't belong to any group.
|
||||
CreateXEvent(&plane, &line, HostEventType::kEagerKernelExecute,
|
||||
kFirstEagerKernelExecuteStartTime, kEagerKernelExecuteDuration);
|
||||
// First function. It creates the first group.
|
||||
CreateXEvent(&plane, &line, HostEventType::kEagerKernelExecute,
|
||||
kSecondEagerKernelExecuteStartTime, kEagerKernelExecuteDuration);
|
||||
CreateXEvent(&plane, &line, HostEventType::kFunctionRun,
|
||||
kFirstFunctionRunStartTime, kFunctionRunDuration);
|
||||
// Eager op. It belongs to the first group.
|
||||
CreateXEvent(&plane, &line, HostEventType::kEagerKernelExecute,
|
||||
kThirdEagerKernelExecuteStartTime, kEagerKernelExecuteDuration);
|
||||
// Second function. It creates the second group.
|
||||
CreateXEvent(&plane, &line, HostEventType::kEagerKernelExecute,
|
||||
kFourthEagerKernelExecuteStartTime, kEagerKernelExecuteDuration);
|
||||
CreateXEvent(&plane, &line, HostEventType::kFunctionRun,
|
||||
kSecondFunctionRunStartTime, kFunctionRunDuration);
|
||||
|
||||
GroupTfEvents(&raw_space, /*event_group_name_map=*/nullptr);
|
||||
CreateTfXPlaneVisitor(raw_plane).ForEachLine(
|
||||
[&](const tensorflow::profiler::XLineVisitor& line) {
|
||||
EXPECT_EQ(line.NumEvents(), 6);
|
||||
line.ForEachEvent(
|
||||
[&](const tensorflow::profiler::XEventVisitor& event) {
|
||||
absl::optional<int64> group_id;
|
||||
if (absl::optional<XStatVisitor> stat =
|
||||
event.GetStat(StatType::kGroupId)) {
|
||||
group_id = stat->IntValue();
|
||||
}
|
||||
if (event.TimestampPs() < kSecondEagerKernelExecuteStartTime) {
|
||||
EXPECT_FALSE(group_id.has_value());
|
||||
} else if (event.TimestampPs() <
|
||||
kFourthEagerKernelExecuteStartTime) {
|
||||
EXPECT_TRUE(group_id.has_value());
|
||||
EXPECT_EQ(*group_id, 0);
|
||||
} else {
|
||||
EXPECT_TRUE(group_id.has_value());
|
||||
EXPECT_EQ(*group_id, 1);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace profiler
|
||||
} // namespace tensorflow
|
||||
|
@ -107,8 +107,6 @@ const HostEventTypeMap& GetHostEventTypeMap() {
|
||||
// tf.data related.
|
||||
{"IteratorGetNextOp::DoCompute", kIteratorGetNextOp},
|
||||
{"IteratorGetNextAsOptionalOp::DoCompute", kIteratorGetNextAsOptionalOp},
|
||||
// Virtual events for grouping.
|
||||
{"AsyncExecutorTraceContext", kAsyncExecutorTraceContext},
|
||||
// GPU related.
|
||||
{"KernelLaunch", kKernelLaunch},
|
||||
{"KernelExecute", kKernelExecute},
|
||||
|
@ -100,8 +100,6 @@ enum HostEventType {
|
||||
// tf.data related.
|
||||
kIteratorGetNextOp,
|
||||
kIteratorGetNextAsOptionalOp,
|
||||
// Virtual events for grouping.
|
||||
kAsyncExecutorTraceContext,
|
||||
// GPU related.
|
||||
kKernelLaunch,
|
||||
kKernelExecute,
|
||||
|
Loading…
x
Reference in New Issue
Block a user