Include last warning and error messages in the worker error status in distributed TF.

Below is an example of the error message seen by the end user (Note the attached log messages):

InvalidArgumentError: From /job:worker/replica:0/task:2:
Integer division by zero
         [[node add_b (defined at /experimental/users/jingdong/tf_examples/test_partition_worker.py:23) ]]
Recent warning and error logs:
  W0411 13:45:01.091773   39609 worker.cc:206] Hardware tracing unavailable, continuing without it. Unavailable: Another profiling session active.
  W0411 13:45:01.092074   39612 worker.cc:206] Hardware tracing unavailable, continuing without it. Unavailable: Another profiling session active.
PiperOrigin-RevId: 249866046
This commit is contained in:
Jing Dong 2019-05-24 11:17:47 -07:00 committed by TensorFlower Gardener
parent 7a9d4a61f1
commit f1a61d3b31
12 changed files with 276 additions and 9 deletions

View File

@ -349,6 +349,8 @@ cc_library(
deps = [
":lib_platform",
"//tensorflow/core/platform/default/build_config:base",
"@com_google_absl//absl/base",
"@com_google_absl//absl/strings",
],
)

View File

@ -197,6 +197,11 @@ class ExecutorBarrier {
error_rendez->Ref();
}
if (!s.ok() && !StatusGroup::IsDerived(s) &&
!status_group_.HasLogMessages()) {
status_group_.AttachLogMessages();
}
status_group_.Update(s);
// If this is the last call to WhenDone, call the final callback

View File

@ -27,19 +27,27 @@ REGISTER_OP("Error")
.Output("out: T")
.Attr("T: type")
.Attr("message: string")
.Attr("log_error: bool = false")
.SetShapeFn(shape_inference::UnknownShape);
class ErrorOp : public OpKernel {
public:
explicit ErrorOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
OP_REQUIRES_OK(ctx, ctx->GetAttr("message", &errmsg_));
OP_REQUIRES_OK(ctx, ctx->GetAttr("log_error", &log_error_));
}
void Compute(OpKernelContext* ctx) override {
// Log only when CancellationManager is set to skip logging when Compute()
// is called during the optimization phase.
if (ctx->cancellation_manager() && log_error_) {
LOG(ERROR) << "ErrorOp: " << errmsg_;
}
ctx->SetStatus(errors::Internal(errmsg_));
}
private:
string errmsg_;
bool log_error_ = false;
};
REGISTER_KERNEL_BUILDER(Name("Error").Device(DEVICE_CPU), ErrorOp);

View File

@ -648,6 +648,65 @@ TEST(GrpcSessionTest, Error) {
Env::Default()->SleepForMicroseconds(2000000);
}
TEST(GrpcSessionTest, ErrorStatusLog) {
std::unique_ptr<test::TestCluster> cluster;
TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster));
const string& master = cluster->targets()[0];
const string& dev_a = cluster->devices()[0].name();
const string& dev_b = cluster->devices()[1].name();
LOG(INFO) << "master " << master << "dev_a " << dev_a << "dev_b " << dev_b;
GraphDef gdef;
std::vector<string> fetches;
{
Graph g(OpRegistry::Global());
// a2 = a + error(a)
//
// Subgraph for "a" fails. The master will cancel the subgraph for
// "b" and then returns the Session::Run.
auto a = test::graph::Constant(&g, Tensor());
a->set_assigned_device_name(dev_a);
auto a_err = test::graph::Error(&g, a, "fantasia!", true);
a_err->set_assigned_device_name(dev_a);
auto a2 = test::graph::Add(&g, a, a_err);
a2->set_assigned_device_name(dev_a);
fetches.push_back(a2->name());
// b2 = b + delay(b)
//
// Subgraph for "b" sleeps at the node "b_delay". When the sleep
// finishes, the subgraph "b" will continue execution till it
// notices that it is canceled. Meanwhile, subgraph's executor
// and its related state (registered ops) should still be alive.
auto b = test::graph::Constant(&g, Tensor());
b->set_assigned_device_name(dev_b);
auto b_delay = test::graph::Delay(&g, b, Microseconds(1000000));
b_delay->set_assigned_device_name(dev_b);
auto b2 = test::graph::Add(&g, b, b_delay);
b2->set_assigned_device_name(dev_b);
fetches.push_back(b2->name());
g.ToGraphDef(&gdef);
}
std::unique_ptr<Session> session(NewRemote(Options(master, 1)));
ASSERT_TRUE(session != nullptr);
TF_CHECK_OK(session->Create(gdef));
{
Status status = session->Run({}, fetches, {}, nullptr);
EXPECT_FALSE(status.ok());
std::cerr << status << "\n";
EXPECT_NE(status.ToString().find("fantasia!"), string::npos);
EXPECT_NE(status.ToString().find("ErrorOp: fantasia!"), string::npos);
}
// session->Close() shall clean up all states related to the session->
// E.g., deregisters subgraph with workers, etc.
TF_CHECK_OK(session->Close());
// Sleep a bit so that most of asynchronous works finishes before
// the test process finishes.
Env::Default()->SleepForMicroseconds(2000000);
}
TEST(GrpcSessionTest, LongErrorMessage) {
std::unique_ptr<test::TestCluster> cluster;
TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster));

View File

@ -28,7 +28,12 @@ limitations under the License.
namespace tensorflow {
Worker::Worker(WorkerEnv* env) : env_(env), recent_request_ids_(100000) {}
Worker::Worker(WorkerEnv* env) : env_(env), recent_request_ids_(100000) {
// Enable log history collection in StatusGroup so that recent warning and
// error log messages will be attached to the root error status to be
// forwarded to the master.
StatusGroup::ConfigureLogHistory();
}
void Worker::GetStatusAsync(const GetStatusRequest* request,
GetStatusResponse* response, StatusCallback done) {

View File

@ -271,11 +271,12 @@ Node* Roll(Graph* g, Node* input, Node* shift, Node* axis) {
return ret;
}
Node* Error(Graph* g, Node* input, const string& errmsg) {
Node* Error(Graph* g, Node* input, const string& errmsg, bool log_error) {
Node* ret;
TF_CHECK_OK(NodeBuilder(g->NewName("n"), "Error")
.Input(input)
.Attr("message", errmsg)
.Attr("log_error", log_error)
.Finalize(g, &ret));
return ret;
}

View File

@ -131,7 +131,8 @@ Node* TruncatedNormal(Graph* g, Node* input, DataType dtype);
// Adds an error node in "g". The node's computation always
// generates an error with the given error message "errmsg".
Node* Error(Graph* g, Node* input, const string& errmsg);
Node* Error(Graph* g, Node* input, const string& errmsg,
bool log_error = false);
// Adds a node that generates a invalid ref output.
Node* InvalidRefType(Graph* g, DataType out_type, DataType invalid_type);

View File

@ -14,15 +14,78 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/lib/core/status.h"
#include <stdio.h>
#include <deque>
#include <map>
#include "absl/base/call_once.h"
#include "tensorflow/core/lib/core/error_codes.pb.h"
#include "tensorflow/core/lib/strings/str_util.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/lib/strings/stringprintf.h"
#include "tensorflow/core/platform/mutex.h"
namespace tensorflow {
namespace {
// Log sink is used to collect recent warning and error log messages to be
// attached to the error status.
class StatusLogSink : public TFLogSink {
public:
static StatusLogSink* GetInstance() {
static StatusLogSink* sink = new StatusLogSink();
return sink;
}
void enable() {
absl::call_once(flag_, [this] {
num_messages_ = 5; // default to 5 messages
if (const char* num_msgs_str =
getenv("TF_WORKER_NUM_FORWARDED_LOG_MESSAGES")) {
if (!absl::SimpleAtoi(num_msgs_str, &num_messages_)) {
LOG(WARNING) << "Failed to parse env variable "
"TF_WORKER_NUM_WARNING_ERROR_LOG_IN_STATUS="
<< num_msgs_str << " as int. Using the default value "
<< num_messages_ << ".";
}
}
if (num_messages_ > 0) {
TFAddLogSink(this);
}
});
}
void GetMessages(std::vector<std::string>* logs) LOCKS_EXCLUDED(mu_) {
mutex_lock lock(mu_);
for (auto& msg : messages_) {
logs->push_back(msg);
}
}
void Send(const TFLogEntry& entry) override LOCKS_EXCLUDED(mu_) {
if (entry.log_severity() < absl::LogSeverity::kWarning) return;
mutex_lock lock(mu_);
messages_.emplace_back(entry.ToString());
if (messages_.size() > num_messages_) messages_.pop_front();
}
private:
mutex mu_;
// for allowing repeated/concurrent calls to enable()
absl::once_flag flag_;
int num_messages_ = 0;
std::deque<std::string> messages_ GUARDED_BY(mu_);
};
} // namespace
Status::Status(tensorflow::error::Code code, StringPiece msg) {
assert(code != tensorflow::error::OK);
state_ = std::unique_ptr<State>(new State);
@ -157,6 +220,10 @@ bool StatusGroup::IsDerived(const Status& s) {
return s.error_message().find(kDerivedMarker) != std::string::npos;
}
void StatusGroup::ConfigureLogHistory() {
StatusLogSink::GetInstance()->enable();
}
void StatusGroup::Update(const Status& s) {
if (s.ok()) {
++num_ok_;
@ -178,6 +245,7 @@ static std::vector<Status> GetNonDerivedStatuses(
}
static constexpr int kMaxAggregatedStatusMessageSize = 8 * 1024;
static constexpr int kMaxAttachedLogMessageSize = 512;
// Summarize all the status objects in the StatusGroup. This is used when
// individual Status objects in the StatusGroup are not already summarized.
@ -186,15 +254,32 @@ Status StatusGroup::as_summary_status() const {
return Status::OK();
}
// Gather recent logs as a string
auto get_recent_logs = [this]() -> std::string {
if (!recent_logs_.empty()) {
std::vector<std::string> fmt;
fmt.push_back("\nRecent warning and error logs:");
for (auto& log : recent_logs_) {
// Add an indentation to make it look nicer.
fmt.push_back(" " + log.substr(0, kMaxAttachedLogMessageSize));
}
return absl::StrJoin(fmt, "\n");
} else {
return "";
}
};
std::vector<Status> nonderived_statuses = GetNonDerivedStatuses(children_);
// If only one root status is found, return it directly.
// If only one root status is found, do not add summary header and footer.
if (nonderived_statuses.size() == 1) {
return nonderived_statuses[0];
return Status(nonderived_statuses[0].code(),
strings::StrCat(nonderived_statuses[0].error_message(),
get_recent_logs()));
}
if (!nonderived_statuses.empty()) {
std::vector<string> fmt;
std::vector<std::string> fmt;
fmt.push_back(strings::Printf("%zu root error(s) found.",
nonderived_statuses.size()));
@ -210,9 +295,11 @@ Status StatusGroup::as_summary_status() const {
strings::Printf("%zu derived errors ignored.",
children_.size() - nonderived_statuses.size()));
return Status(
nonderived_statuses[0].code(),
absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize));
std::string error_msg =
absl::StrJoin(fmt, "\n").substr(0, kMaxAggregatedStatusMessageSize);
return Status(nonderived_statuses[0].code(),
strings::StrCat(error_msg, get_recent_logs()));
} else {
// All statuses are derived. Pick the first available status to return.
return children_[0];
@ -250,4 +337,9 @@ Status StatusGroup::as_concatenated_status() const {
}
}
void StatusGroup::AttachLogMessages() {
recent_logs_.clear();
StatusLogSink::GetInstance()->GetMessages(&recent_logs_);
}
} // namespace tensorflow

View File

@ -105,6 +105,10 @@ class StatusGroup {
static Status MakeDerived(const Status& s);
static bool IsDerived(const Status& s);
// Enable warning and error log collection for appending to the aggregated
// status. This function may be called more than once.
static void ConfigureLogHistory();
// Return a merged status with combined child status messages with a summary.
Status as_summary_status() const;
// Return a merged status with combined child status messages with
@ -116,10 +120,15 @@ class StatusGroup {
// Augment this group with the child status `status`.
void Update(const Status& status);
// Attach recent warning and error log messages
void AttachLogMessages();
bool HasLogMessages() const { return !recent_logs_.empty(); }
private:
bool ok_ = true;
size_t num_ok_ = 0;
std::vector<Status> children_;
std::vector<std::string> recent_logs_; // recent warning and error logs
};
inline Status::Status(const Status& s)

View File

@ -31,6 +31,23 @@ limitations under the License.
#include <unordered_map>
namespace tensorflow {
void TFAddLogSink(TFLogSink* sink) {
// LogSink is not implemented.
// If necessary, one can add the log sink support as follows.
// 1. Define a global vector<TFLogSink> to keep track of all registered
// TFLogSink objects. Protect the global vector with mutex to make it
// thread-safe.
// 2. Add/remove elements from the global vector<TFLogSink> in TFAddLogSink
// and TFRemoveLogSink function
// 3. Add logic in LogMessage::GenerateLogMessage() below to dispatch log
// messages to all the registered log sinks.
}
void TFRemoveLogSink(TFLogSink* sink) {
// LogSink is not implemented.
}
namespace internal {
#if defined(PLATFORM_POSIX_ANDROID)

View File

@ -21,6 +21,9 @@ limitations under the License.
#include <limits>
#include <sstream>
#include "absl/base/log_severity.h"
#include "absl/strings/string_view.h"
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/types.h"
@ -341,6 +344,59 @@ int64 MinLogLevelFromEnv();
int64 MinVLogLevelFromEnv();
} // namespace internal
// LogSink support adapted from //base/logging.h
//
// `LogSink` is an interface which can be extended to intercept and process
// all log messages. LogSink implementations must be thread-safe. A single
// instance will be called from whichever thread is performing a logging
// operation.
class TFLogEntry {
static absl::LogSeverity AsAbslLogSecurity(int severity) {
return static_cast<absl::LogSeverity>(severity);
}
public:
explicit TFLogEntry(int severity, absl::string_view log_line)
: severity_(AsAbslLogSecurity(severity)), log_line_(log_line) {}
absl::LogSeverity log_severity() const { return severity_; }
std::string ToString() const { return std::string(log_line_); }
private:
const absl::LogSeverity severity_;
const absl::string_view log_line_;
};
class TFLogSink {
public:
virtual ~TFLogSink() = default;
// `Send` is called synchronously during the log statement. The logging
// module guarantees not to call `Send` concurrently on the same log sink.
// Implementations should be careful not to call`LOG` or `CHECK` or take
// any locks that might be held by the `LOG` caller, to avoid deadlock.
//
// `e` is guaranteed to remain valid until the subsequent call to
// `WaitTillSent` completes, so implementations may store a pointer to or
// copy of `e` (e.g. in a thread local variable) for use in `WaitTillSent`.
virtual void Send(const TFLogEntry& entry) = 0;
// `WaitTillSent` blocks the calling thread (the thread that generated a log
// message) until the sink has finished processing the log message.
// `WaitTillSent` is called once per log message, following the call to
// `Send`. This may be useful when log messages are buffered or processed
// asynchronously by an expensive log sink.
// The default implementation returns immediately. Like `Send`,
// implementations should be careful not to call `LOG` or `CHECK or take any
// locks that might be held by the `LOG` caller, to avoid deadlock.
virtual void WaitTillSent() {}
};
// Add or remove a `LogSink` as a consumer of logging data. Thread-safe.
void TFAddLogSink(TFLogSink* sink);
void TFRemoveLogSink(TFLogSink* sink);
} // namespace tensorflow
#endif // TENSORFLOW_CORE_PLATFORM_DEFAULT_LOGGING_H_

View File

@ -21,6 +21,18 @@ limitations under the License.
#if defined(PLATFORM_GOOGLE) || defined(PLATFORM_GOOGLE_ANDROID) || \
defined(GOOGLE_LOGGING) || defined(__EMSCRIPTEN__)
#include "tensorflow/core/platform/google/build_config/logging.h"
namespace tensorflow {
// Adapt Google LogSink interface to the TF interface.
using TFLogSink = ::base_logging::LogSink;
using TFLogEntry = absl::LogEntry;
inline void TFAddLogSink(TFLogSink* sink) { ::base_logging::AddLogSink(sink); }
inline void TFRemoveLogSink(TFLogSink* sink) {
::base_logging::RemoveLogSink(sink);
}
} // namespace tensorflow
#else
#include "tensorflow/core/platform/default/logging.h"
#endif