From 25a6b8b2edde3b6b79edb5b2d62476347ec7d37f Mon Sep 17 00:00:00 2001
From: Yi Situ <yisitu@google.com>
Date: Wed, 28 Oct 2020 10:42:19 -0700
Subject: [PATCH 1/2] Add delayed start capability to profiler session.

PiperOrigin-RevId: 339490700
Change-Id: Ib06ca2d97009562dfd27563774f6d9db939e7e11
---
 tensorflow/core/profiler/lib/BUILD            |  1 +
 .../core/profiler/lib/profiler_session.cc     | 26 +++++++++++++++++--
 .../core/profiler/lib/profiler_session.h      |  2 +-
 .../core/profiler/profiler_options.proto      |  5 +++-
 tensorflow/core/profiler/rpc/client/BUILD     |  1 +
 .../profiler/rpc/client/capture_profile.cc    | 12 ++++++++-
 .../profiler/rpc/client/capture_profile.h     |  2 +-
 .../integration_test/profiler_api_test.py     | 13 ++++++++--
 .../profiler/internal/profiler_wrapper.cc     | 18 +++----------
 9 files changed, 57 insertions(+), 23 deletions(-)

diff --git a/tensorflow/core/profiler/lib/BUILD b/tensorflow/core/profiler/lib/BUILD
index 10e5df84345..25fdc35368a 100644
--- a/tensorflow/core/profiler/lib/BUILD
+++ b/tensorflow/core/profiler/lib/BUILD
@@ -55,6 +55,7 @@ cc_library(
         "//tensorflow/core/profiler/protobuf:xplane_proto_cc",
         "//tensorflow/core/profiler:profiler_options_proto_cc",
         "@com_google_absl//absl/memory",
+        "@com_google_absl//absl/time",
         "//tensorflow/core:protos_all_cc",
     ] + if_not_android([
         ":profiler_lock",
diff --git a/tensorflow/core/profiler/lib/profiler_session.cc b/tensorflow/core/profiler/lib/profiler_session.cc
index f37cb12ebab..c1b871025d6 100644
--- a/tensorflow/core/profiler/lib/profiler_session.cc
+++ b/tensorflow/core/profiler/lib/profiler_session.cc
@@ -18,7 +18,10 @@ limitations under the License.
 #include <memory>
 
 #include "absl/memory/memory.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
 #include "tensorflow/core/platform/env_time.h"
+#include "tensorflow/core/platform/errors.h"
 #include "tensorflow/core/platform/logging.h"
 #include "tensorflow/core/platform/mutex.h"
 #include "tensorflow/core/platform/platform.h"
@@ -65,6 +68,7 @@ tensorflow::Status ProfilerSession::Status() {
 Status ProfilerSession::CollectData(profiler::XSpace* space) {
   mutex_lock l(mutex_);
   if (!status_.ok()) return status_;
+  LOG(INFO) << "Profiler session collecting data.";
   for (auto& profiler : profilers_) {
     profiler->Stop().IgnoreError();
   }
@@ -116,7 +120,6 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
 #else
     : active_(false),
 #endif
-      start_time_ns_(EnvTime::NowNanos()),
       options_(std::move(options)) {
   if (!active_) {
 #if !defined(IS_MOBILE_PLATFORM)
@@ -130,6 +133,25 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
     return;
   }
 
+  LOG(INFO) << "Profiler session initializing.";
+  // Sleep until it is time to start profiling.
+  const bool delayed_start = options_.start_timestamp_ns() > 0;
+  if (delayed_start) {
+    absl::Time scheduled_start =
+        absl::FromUnixNanos(options_.start_timestamp_ns());
+    auto now = absl::Now();
+    if (scheduled_start < now) {
+      LOG(WARNING) << "Profiling is late (" << now
+                   << ") for the scheduled start (" << scheduled_start
+                   << ") and will start immediately.";
+    } else {
+      absl::Duration sleep_duration = scheduled_start - now;
+      LOG(INFO) << "Delaying start of profiler session by " << sleep_duration;
+      absl::SleepFor(sleep_duration);
+    }
+  }
+
+  start_time_ns_ = EnvTime::NowNanos();
   LOG(INFO) << "Profiler session started.";
 
 #if !defined(IS_MOBILE_PLATFORM)
@@ -147,7 +169,7 @@ ProfilerSession::ProfilerSession(ProfileOptions options)
 }
 
 ProfilerSession::~ProfilerSession() {
-  VLOG(1) << "Profiler session stopping.";
+  LOG(INFO) << "Profiler session tear down.";
   for (auto& profiler : profilers_) {
     profiler->Stop().IgnoreError();
   }
diff --git a/tensorflow/core/profiler/lib/profiler_session.h b/tensorflow/core/profiler/lib/profiler_session.h
index 976ebcfc884..2a852c92267 100644
--- a/tensorflow/core/profiler/lib/profiler_session.h
+++ b/tensorflow/core/profiler/lib/profiler_session.h
@@ -79,7 +79,7 @@ class ProfilerSession {
   bool active_ TF_GUARDED_BY(mutex_);
 
   tensorflow::Status status_ TF_GUARDED_BY(mutex_);
-  const uint64 start_time_ns_;
+  uint64 start_time_ns_;
   mutex mutex_;
   ProfileOptions options_;
 };
diff --git a/tensorflow/core/profiler/profiler_options.proto b/tensorflow/core/profiler/profiler_options.proto
index 7858f08c8ec..899bdb909b0 100644
--- a/tensorflow/core/profiler/profiler_options.proto
+++ b/tensorflow/core/profiler/profiler_options.proto
@@ -63,7 +63,7 @@ message ProfileOptions {
 }
 
 // Options for remote profiler session manager.
-// Next ID: 5
+// Next ID: 6
 message RemoteProfilerSessionManagerOptions {
   // Options for each local profiler.
   ProfileOptions profiler_options = 1;
@@ -79,4 +79,7 @@ message RemoteProfilerSessionManagerOptions {
   // continues until interrupted. Otherwise, value must be greater than
   // profiler_options.duration_ms.
   uint64 max_session_duration_ms = 4;
+
+  // Start of profiling is delayed by this much (in milliseconds).
+  uint64 delay_ms = 5;
 }
diff --git a/tensorflow/core/profiler/rpc/client/BUILD b/tensorflow/core/profiler/rpc/client/BUILD
index ca1ff506f2a..ea37a8852a0 100644
--- a/tensorflow/core/profiler/rpc/client/BUILD
+++ b/tensorflow/core/profiler/rpc/client/BUILD
@@ -37,6 +37,7 @@ cc_library(
         "//tensorflow/core/profiler/convert:xplane_to_profile_response",
         "//tensorflow/core/profiler/protobuf:xplane_proto_cc",
         "@com_google_absl//absl/strings",
+        "@com_google_absl//absl/time",
     ],
 )
 
diff --git a/tensorflow/core/profiler/rpc/client/capture_profile.cc b/tensorflow/core/profiler/rpc/client/capture_profile.cc
index 3f59d2ba265..19dba9da65d 100644
--- a/tensorflow/core/profiler/rpc/client/capture_profile.cc
+++ b/tensorflow/core/profiler/rpc/client/capture_profile.cc
@@ -21,6 +21,8 @@ limitations under the License.
 
 #include "absl/strings/str_join.h"
 #include "absl/strings/str_split.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
 #include "tensorflow/core/platform/errors.h"
 #include "tensorflow/core/platform/host_info.h"
 #include "tensorflow/core/platform/status.h"
@@ -170,7 +172,7 @@ Status NewSession(absl::string_view repository_root,
 }  // namespace
 
 Status Trace(const std::string& logdir, int num_tracing_attempts,
-             const RemoteProfilerSessionManagerOptions& opts,
+             RemoteProfilerSessionManagerOptions& opts,
              bool is_cloud_tpu_session) {
   DCHECK_GT(opts.profiler_options().duration_ms(), 0);
   DCHECK(!opts.service_addresses().empty());
@@ -184,6 +186,14 @@ Status Trace(const std::string& logdir, int num_tracing_attempts,
   Status status;
   int remaining_attempts = num_tracing_attempts;
   while (true) {
+    auto start_timestamp = absl::Now() + absl::Milliseconds(opts.delay_ms());
+    opts.mutable_profiler_options()->set_start_timestamp_ns(
+        absl::ToUnixNanos(start_timestamp));
+    LOG(INFO) << "Profiler delay_ms was " << opts.delay_ms()
+              << ", start_timestamp_ns set to "
+              << opts.profiler_options().start_timestamp_ns() << " ["
+              << start_timestamp << "]";
+
     std::cout << "Starting to trace for " << duration_ms << " ms. "
               << "Remaining attempt(s): " << --remaining_attempts << std::endl;
 
diff --git a/tensorflow/core/profiler/rpc/client/capture_profile.h b/tensorflow/core/profiler/rpc/client/capture_profile.h
index cb9183e28a7..96f3dafa06a 100644
--- a/tensorflow/core/profiler/rpc/client/capture_profile.h
+++ b/tensorflow/core/profiler/rpc/client/capture_profile.h
@@ -40,7 +40,7 @@ Status Monitor(const std::string& service_addr, int duration_ms,
 // in the given logdir. If no trace was collected, retries tracing for
 // num_tracing_attempts. Assumes that options have been validated.
 Status Trace(const std::string& logdir, int num_tracing_attempts,
-             const RemoteProfilerSessionManagerOptions& opts,
+             RemoteProfilerSessionManagerOptions& opts,
              bool is_cloud_tpu_session);
 
 }  // namespace profiler
diff --git a/tensorflow/python/profiler/integration_test/profiler_api_test.py b/tensorflow/python/profiler/integration_test/profiler_api_test.py
index 4e2a9dfd4e3..a491a086562 100644
--- a/tensorflow/python/profiler/integration_test/profiler_api_test.py
+++ b/tensorflow/python/profiler/integration_test/profiler_api_test.py
@@ -116,8 +116,17 @@ class ProfilerApiTest(test_util.TensorFlowTestCase):
     thread_worker.join(120)
     self._check_xspace_pb_exist(logdir)
 
-  def test_single_worker_sampling_mode_delayed(self):
-    """Test single worker sampling mode with delay."""
+  def test_single_worker_sampling_mode_short_delay(self):
+    """Test single worker sampling mode with a short delay.
+
+    Expect that requested delayed start time will arrive late, and a subsequent
+    retry will issue an immediate start.
+    """
+
+    self.test_single_worker_sampling_mode(delay_ms=1)
+
+  def test_single_worker_sampling_mode_long_delay(self):
+    """Test single worker sampling mode with a long delay."""
 
     self.test_single_worker_sampling_mode(delay_ms=1000)
 
diff --git a/tensorflow/python/profiler/internal/profiler_wrapper.cc b/tensorflow/python/profiler/internal/profiler_wrapper.cc
index 2b513547612..6827e4e32a5 100644
--- a/tensorflow/python/profiler/internal/profiler_wrapper.cc
+++ b/tensorflow/python/profiler/internal/profiler_wrapper.cc
@@ -146,7 +146,6 @@ RemoteProfilerSessionManagerOptions GetOptionsLocked(absl::string_view logdir,
   VLOG(2) << "repository_path set to "
           << options.profiler_options().repository_path();
 
-  int delay_ms = 0;
   for (const auto& kw : opts) {
     std::string key = py::cast<std::string>(kw.first);
     if (key == "host_tracer_level") {
@@ -163,26 +162,15 @@ RemoteProfilerSessionManagerOptions GetOptionsLocked(absl::string_view logdir,
       VLOG(1) << "python_tracer_level set to " << value;
     } else if (key == "delay_ms") {
       if (!kw.second.is_none()) {
-        delay_ms = py::cast<int>(kw.second);
+        auto value = py::cast<int>(kw.second);
+        options.set_delay_ms(value);
+        VLOG(1) << "delay_ms was set to " << value;
       }
     } else {
       LOG(WARNING) << "Unrecognised key: " << key;
     }
   }
 
-  if (delay_ms) {
-    absl::Time start_timestamp = now + absl::Milliseconds(delay_ms);
-    tensorflow::int64 start_timestamp_ns = absl::ToUnixNanos(start_timestamp);
-    options.mutable_profiler_options()->set_start_timestamp_ns(
-        start_timestamp_ns);
-    LOG(INFO) << "delay_ms was " << delay_ms << ", start_timestamp_ns set to "
-              << start_timestamp_ns << " [" << start_timestamp << "]";
-  } else {
-    DCHECK_EQ(options.mutable_profiler_options()->start_timestamp_ns(), 0);
-    LOG(INFO) << "Profiling will start immediately because delay_ms was unset "
-                 "or zero.";
-  }
-
   return options;
 }
 

From 17fae8cee663e5833072ec0c892d6fa6c3e98de3 Mon Sep 17 00:00:00 2001
From: Yi Situ <yisitu@google.com>
Date: Wed, 28 Oct 2020 15:04:09 -0700
Subject: [PATCH 2/2] Removed a flaky test.

PiperOrigin-RevId: 339545685
Change-Id: Ife8a0ee6a91825e094f3fdea5a3779da083b751b
---
 .../profiler/integration_test/profiler_api_test.py       | 9 ---------
 1 file changed, 9 deletions(-)

diff --git a/tensorflow/python/profiler/integration_test/profiler_api_test.py b/tensorflow/python/profiler/integration_test/profiler_api_test.py
index a491a086562..1d79b660ba5 100644
--- a/tensorflow/python/profiler/integration_test/profiler_api_test.py
+++ b/tensorflow/python/profiler/integration_test/profiler_api_test.py
@@ -116,15 +116,6 @@ class ProfilerApiTest(test_util.TensorFlowTestCase):
     thread_worker.join(120)
     self._check_xspace_pb_exist(logdir)
 
-  def test_single_worker_sampling_mode_short_delay(self):
-    """Test single worker sampling mode with a short delay.
-
-    Expect that requested delayed start time will arrive late, and a subsequent
-    retry will issue an immediate start.
-    """
-
-    self.test_single_worker_sampling_mode(delay_ms=1)
-
   def test_single_worker_sampling_mode_long_delay(self):
     """Test single worker sampling mode with a long delay."""