XLA open-sourcing: remove dependency on util/process/subprocess.h.

Add to TensorFlow a subset of the functionality of SubProcess to support the
open-sourceable parts of XLA and support TensorFlow's own testing.

On google3 let it use the internal SubProcess from google3.  In the open-source
world, use the TF implementation of the feature subset.

Modify XLA and some TF tests to use the TF SubProcess.
Change: 139114815
This commit is contained in:
A. Unique TensorFlower 2016-11-14 13:40:50 -08:00 committed by TensorFlower Gardener
parent dd8f0ded60
commit 3b9bbb977e
8 changed files with 850 additions and 74 deletions

View File

@ -214,6 +214,7 @@ cc_library(
"platform/protobuf.h",
"platform/stacktrace.h",
"platform/strong_hash.h",
"platform/subprocess.h",
"platform/thread_annotations.h",
"platform/types.h",
],
@ -1539,6 +1540,7 @@ tf_cc_tests(
"platform/net_test.cc",
"platform/port_test.cc",
"platform/profile_utils/cpu_utils_test.cc",
"platform/subprocess_test.cc",
],
deps = [
":lib",

View File

@ -60,7 +60,7 @@ class TestCluster {
private:
TestCluster() = default;
std::vector<std::unique_ptr<testing::SubProcess>> subprocesses_;
std::vector<std::unique_ptr<SubProcess>> subprocesses_;
std::vector<string> targets_;
std::vector<DeviceAttributes> devices_;

View File

@ -0,0 +1,464 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/subprocess.h"
// 1) FYI from m3b@ about fork():
// A danger of calling fork() (as opposed to clone() or vfork()) is that if
// many people have used pthread_atfork() to acquire locks, fork() can deadlock,
// because it's unlikely that the locking order will be correct in a large
// programme where different layers are unaware of one another and using
// pthread_atfork() independently.
//
// The danger of not calling fork() is that if libc managed to use
// pthread_atfork() correctly (for example, to lock the environment), you'd
// miss out on that protection. (But as far as I can see most libc's don't get
// that right; certainly glibc doesn't seem to.)
//
// clone() or vfork() are also frustrating because clone() exists only on Linux,
// and both clone(...CLONE_VM...) and vfork() have interesting issues around
// signals being delivered after the fork and before the exec. It may be
// possible to work around the latter by blocking all signals before the fork
// and unblocking them afterwards.
//
// Fortunately, most people haven't heard of pthread_atfork().
//
//
// 2) FYI from m3b@ about execv():
// The execv() call implicitly uses the libc global variable environ, which was
// copied by fork(), and that copy could have raced with a setenv() call in
// another thread, since libc implementations are usually not very careful about
// this. (glibc isn't careful, for example.)
//
// If this were inside libc, we could use locks or memory barriers to avoid the
// race, but as it is, I see nothing you can do. Even if you tried to copy the
// environment before the fork(), the copying could race with other threads
// calling setenv(). The good news is that few people call setenv().
//
// Amusingly, the standard says of fork(): "...to avoid errors, the child
// process may only execute async-signal-safe operations until such time as one
// of the exec functions is called." Notice that execve() is listed as
// async-signal-safe, but execv() is not, and the difference is just the
// handling of the environment.
namespace tensorflow {
SubProcess::SubProcess(int nfds)
: running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) {
// The input 'nfds' parameter is currently ignored and the internal constant
// 'kNFds' is used to support the 3 channels (stdin, stdout, stderr).
for (int i = 0; i < kNFds; i++) {
action_[i] = ACTION_CLOSE;
parent_pipe_[i] = -1;
child_pipe_[i] = -1;
}
}
SubProcess::~SubProcess() {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
pid_ = -1;
running_ = false;
FreeArgs();
ClosePipes();
}
void SubProcess::FreeArgs() {
free(exec_path_);
exec_path_ = nullptr;
if (exec_argv_) {
for (char** p = exec_argv_; *p != nullptr; p++) {
free(*p);
}
delete[] exec_argv_;
exec_argv_ = nullptr;
}
}
void SubProcess::ClosePipes() {
for (int i = 0; i < kNFds; i++) {
if (parent_pipe_[i] >= 0) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
}
if (child_pipe_[i] >= 0) {
close(child_pipe_[i]);
child_pipe_[i] = -1;
}
}
}
void SubProcess::SetProgram(const string& file,
const std::vector<string>& argv) {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(FATAL) << "SetProgram called after the process was started.";
return;
}
FreeArgs();
exec_path_ = strdup(file.c_str());
if (exec_path_ == nullptr) {
LOG(FATAL) << "SetProgram failed to allocate file string.";
return;
}
int argc = argv.size();
exec_argv_ = new char*[argc + 1];
for (int i = 0; i < argc; i++) {
exec_argv_[i] = strdup(argv[i].c_str());
if (exec_argv_[i] == nullptr) {
LOG(FATAL) << "SetProgram failed to allocate command argument.";
return;
}
}
exec_argv_[argc] = nullptr;
}
void SubProcess::SetChannelAction(Channel chan, ChannelAction action) {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(FATAL) << "SetChannelAction called after the process was started.";
} else if (!chan_valid(chan)) {
LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan;
} else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) &&
(action != ACTION_DUPPARENT)) {
LOG(FATAL) << "SetChannelAction called with invalid action: " << action;
} else {
action_[chan] = action;
}
}
bool SubProcess::Start() {
mutex_lock procLock(proc_mu_);
mutex_lock dataLock(data_mu_);
if (running_) {
LOG(ERROR) << "Start called after the process was started.";
return false;
}
if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) {
LOG(ERROR) << "Start called without setting a program.";
return false;
}
// Create parent/child pipes for the specified channels and make the
// parent-side of the pipes non-blocking.
for (int i = 0; i < kNFds; i++) {
if (action_[i] == ACTION_PIPE) {
int pipe_fds[2];
if (pipe(pipe_fds) < 0) {
LOG(ERROR) << "Start cannot create pipe: " << strerror(errno);
ClosePipes();
return false;
}
// Handle the direction of the pipe (stdin vs stdout/err).
if (i == 0) {
parent_pipe_[i] = pipe_fds[1];
child_pipe_[i] = pipe_fds[0];
} else {
parent_pipe_[i] = pipe_fds[0];
child_pipe_[i] = pipe_fds[1];
}
if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) {
LOG(ERROR) << "Start cannot make pipe non-blocking: "
<< strerror(errno);
ClosePipes();
return false;
}
if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) {
LOG(ERROR) << "Start cannot make pipe close-on-exec: "
<< strerror(errno);
ClosePipes();
return false;
}
}
}
// Start the child process and setup the file descriptors of both processes.
// See comment (1) in the header about issues with the use of fork().
pid_ = fork();
if (pid_ < 0) {
LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno);
ClosePipes();
return false;
}
if (pid_ > 0) {
// Parent process: close the child-side pipes and return.
running_ = true;
for (int i = 0; i < kNFds; i++) {
if (child_pipe_[i] >= 0) {
close(child_pipe_[i]);
child_pipe_[i] = -1;
}
}
return true;
}
// Child process: close parent-side pipes and channels marked for closing.
// For pipe channels, replace their file descriptors with the pipes.
int devnull_fd = -1;
for (int i = 0; i < kNFds; i++) {
if (parent_pipe_[i] >= 0) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
}
switch (action_[i]) {
case ACTION_DUPPARENT:
// Nothing to do, fork() took care of it.
break;
case ACTION_PIPE:
while (dup2(child_pipe_[i], i) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
close(child_pipe_[i]);
child_pipe_[i] = -1;
break;
case ACTION_CLOSE:
default:
// Do not close stdin/out/err, instead redirect them to /dev/null so
// their file descriptors remain unavailable for reuse by open(), etc.
if (i <= CHAN_STDERR) {
if (devnull_fd < 0) {
while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
}
while (dup2(devnull_fd, i) < 0) {
if (!retry(errno)) {
_exit(1);
}
}
} else {
close(i);
}
break;
}
}
if (devnull_fd >= 0) {
close(devnull_fd);
}
// Execute the child program.
// See comment (2) in the header about issues with the use of execv().
execv(exec_path_, exec_argv_);
_exit(1);
}
bool SubProcess::Wait() {
int status;
return WaitInternal(&status);
}
bool SubProcess::WaitInternal(int* status) {
// The waiter must release proc_mu_ while waiting in order for Kill() to work.
proc_mu_.lock();
bool running = running_;
pid_t pid = pid_;
proc_mu_.unlock();
bool ret = false;
if (running && (pid > 1)) {
pid_t cpid;
int cstat;
bool done = false;
while (!done) {
cpid = waitpid(pid, &cstat, 0);
if ((cpid < 0) && !retry(errno)) {
done = true;
} else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) {
*status = cstat;
ret = true;
done = true;
}
}
}
proc_mu_.lock();
if ((running_ == running) && (pid_ == pid)) {
running_ = false;
pid_ = -1;
}
proc_mu_.unlock();
return ret;
}
bool SubProcess::Kill(int signal) {
proc_mu_.lock();
bool running = running_;
pid_t pid = pid_;
proc_mu_.unlock();
bool ret = false;
if (running && (pid > 1)) {
ret = (kill(pid, signal) == 0);
}
return ret;
}
int SubProcess::Communicate(const string* stdin_input, string* stdout_output,
string* stderr_output) {
struct pollfd fds[kNFds];
size_t nbytes[kNFds];
string* iobufs[kNFds];
int fd_count = 0;
proc_mu_.lock();
bool running = running_;
proc_mu_.unlock();
if (!running) {
LOG(ERROR) << "Communicate called without a running process.";
return 1;
}
// If SIGPIPE handling is the default action, change it to ignore SIGPIPE and
// keep it ignored, don't change it back. This is needed while communicating
// with the child process so the parent process can survive the death of the
// child process while it is writing to its stdin. If the application has
// registered a SIGPIPE handler, then let it deal with any signals generated
// by the premature death of the child process, don't overwrite its handler.
struct sigaction act;
if (sigaction(SIGPIPE, nullptr, &act) < 0) {
LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno);
return 1;
}
if (act.sa_handler == SIG_DFL) {
memset(&act, 0, sizeof(act));
act.sa_handler = SIG_IGN;
sigemptyset(&act.sa_mask);
if (sigaction(SIGPIPE, &act, nullptr) < 0) {
LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno);
return 1;
}
}
// Lock data_mu_ but not proc_mu_ while communicating with the child process
// in order for Kill() to be able to terminate the child from another thread.
data_mu_.lock();
// Initialize the poll() structures and buffer tracking.
for (int i = 0; i < kNFds; i++) {
if (action_[i] == ACTION_PIPE) {
switch (i) {
case CHAN_STDIN:
// Special case: if no data is given to send to the child process,
// close the pipe to unblock the child, and skip the file descriptor.
if (stdin_input == nullptr) {
close(parent_pipe_[i]);
parent_pipe_[i] = -1;
continue;
}
iobufs[fd_count] = const_cast<string*>(stdin_input);
break;
case CHAN_STDOUT:
iobufs[fd_count] = stdout_output;
break;
case CHAN_STDERR:
iobufs[fd_count] = stderr_output;
break;
default:
iobufs[fd_count] = nullptr;
break;
}
nbytes[fd_count] = 0;
fds[fd_count].fd = parent_pipe_[i];
fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT;
fds[fd_count].revents = 0;
fd_count++;
}
}
// Loop communicating with the child process.
int fd_remain = fd_count;
char buf[4096];
while (fd_remain > 0) {
int n = poll(fds, fd_count, -1);
if ((n < 0) && !retry(errno)) {
LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno);
fd_remain = 0;
} else if (n == 0) {
LOG(ERROR) << "Communicate cannot poll(): timeout not possible";
fd_remain = 0;
} else if (n > 0) {
// Handle the pipes ready for I/O.
for (int i = 0; i < fd_count; i++) {
if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) {
// Read from one of the child's outputs.
ssize_t n = read(fds[i].fd, buf, sizeof(buf));
if (n > 0) {
if (iobufs[i] != nullptr) {
iobufs[i]->append(buf, n);
nbytes[i] += n;
}
} else if ((n == 0) || !retry(errno)) {
fds[i].fd = -1;
fd_remain--;
}
} else if ((fds[i].revents & POLLOUT) != 0) {
// Write to the child's stdin.
ssize_t n = iobufs[i]->size() - nbytes[i];
if (n > 0) {
n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n);
}
if (n >= 0) {
nbytes[i] += n;
if (nbytes[i] >= iobufs[i]->size()) {
fds[i].fd = -1;
fd_remain--;
// Close the child's stdin pipe to unblock the process.
close(parent_pipe_[CHAN_STDIN]);
parent_pipe_[CHAN_STDIN] = -1;
}
} else if (!retry(errno)) {
fds[i].fd = -1;
fd_remain--;
}
} else if ((fds[i].revents & POLLERR) != 0) {
fds[i].fd = -1;
fd_remain--;
}
}
}
}
data_mu_.unlock();
// Wait for the child process to exit and return its status.
int status;
return WaitInternal(&status) ? status : -1;
}
} // namespace tensorflow

View File

@ -0,0 +1,131 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_
#define TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_
#include <errno.h>
#include <unistd.h>
#include <string>
#include <vector>
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/types.h"
namespace tensorflow {
class SubProcess {
public:
// SubProcess()
// nfds: The number of file descriptors to use.
explicit SubProcess(int nfds = 3);
// Virtual for backwards compatibility; do not create new subclasses.
// It is illegal to delete the SubProcess within its exit callback.
virtual ~SubProcess();
// SetChannelAction()
// Set how to handle a channel. The default action is ACTION_CLOSE.
// The action is set for all subsequent processes, until SetChannel()
// is called again.
//
// SetChannel may not be called while the process is running.
//
// chan: Which channel this applies to.
// action: What to do with the channel.
// Virtual for backwards compatibility; do not create new subclasses.
virtual void SetChannelAction(Channel chan, ChannelAction action);
// SetProgram()
// Set up a program and argument list for execution, with the full
// "raw" argument list passed as a vector of strings. argv[0]
// should be the program name, just as in execv().
//
// file: The file containing the program. This must be an absolute path
// name - $PATH is not searched.
// argv: The argument list.
virtual void SetProgram(const string& file, const std::vector<string>& argv);
// Start()
// Run the command that was previously set up with SetProgram().
// The following are fatal programming errors:
// * Attempting to start when a process is already running.
// * Attempting to start without first setting the command.
// Note, however, that Start() does not try to validate that the binary
// does anything reasonable (e.g. exists or can execute); as such, you can
// specify a non-existent binary and Start() will still return true. You
// will get a failure from the process, but only after Start() returns.
//
// Return true normally, or false if the program couldn't be started
// because of some error.
// Virtual for backwards compatibility; do not create new subclasses.
virtual bool Start();
// Kill()
// Send the given signal to the process.
// Return true normally, or false if we couldn't send the signal - likely
// because the process doesn't exist.
virtual bool Kill(int signal);
// Wait()
// Block until the process exits.
// Return true normally, or false if the process wasn't running.
virtual bool Wait();
// Communicate()
// Read from stdout and stderr and writes to stdin until all pipes have
// closed, then waits for the process to exit.
// Note: Do NOT call Wait() after calling Communicate as it will always
// fail, since Communicate calls Wait() internally.
// 'stdin_input', 'stdout_output', and 'stderr_output' may be NULL.
// If this process is not configured to send stdout or stderr to pipes,
// the output strings will not be modified.
// If this process is not configured to take stdin from a pipe, stdin_input
// will be ignored.
// Returns the command's exit status.
virtual int Communicate(const string* stdin_input, string* stdout_output,
string* stderr_output);
private:
static const int kNFds = 3;
static bool chan_valid(int chan) { return ((chan >= 0) && (chan < kNFds)); }
static bool retry(int e) {
return ((e == EINTR) || (e == EAGAIN) || (e == EWOULDBLOCK));
}
void FreeArgs() EXCLUSIVE_LOCKS_REQUIRED(data_mu_);
void ClosePipes() EXCLUSIVE_LOCKS_REQUIRED(data_mu_);
bool WaitInternal(int* status);
// The separation between proc_mu_ and data_mu_ mutexes allows Kill() to be
// called by a thread while another thread is inside Wait() or Communicate().
mutable mutex proc_mu_;
bool running_ GUARDED_BY(proc_mu_);
pid_t pid_ GUARDED_BY(proc_mu_);
mutable mutex data_mu_ ACQUIRED_AFTER(proc_mu_);
char* exec_path_ GUARDED_BY(data_mu_);
char** exec_argv_ GUARDED_BY(data_mu_);
ChannelAction action_[kNFds] GUARDED_BY(data_mu_);
int parent_pipe_[kNFds] GUARDED_BY(data_mu_);
int child_pipe_[kNFds] GUARDED_BY(data_mu_);
TF_DISALLOW_COPY_AND_ASSIGN(SubProcess);
};
} // namespace tensorflow
#endif // TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_

View File

@ -20,62 +20,17 @@ limitations under the License.
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/subprocess.h"
namespace tensorflow {
namespace testing {
namespace {
class PosixSubProcess : public SubProcess {
public:
PosixSubProcess(const std::vector<string>& argv) : argv_(argv), pid_(0) {}
~PosixSubProcess() override {}
bool Start() override {
if (pid_ != 0) {
LOG(ERROR) << "Tried to start process multiple times.";
return false;
}
pid_ = fork();
if (pid_ == 0) {
// We are in the child process.
const char* path = argv_[0].c_str();
const char** argv = new const char*[argv_.size() + 1];
int i = 0;
for (const string& arg : argv_) {
argv[i++] = arg.c_str();
}
argv[argv_.size()] = nullptr;
execv(path, (char* const*)argv);
// Never executes.
return true;
} else if (pid_ < 0) {
LOG(ERROR) << "Failed to fork process.";
return false;
} else {
// We are in the parent process and fork() was successful.
// TODO(mrry): Consider collecting stderr from the child.
return true;
}
}
bool Kill(int signal) override {
if (pid_ == 0) {
LOG(ERROR) << "Tried to kill process before starting it.";
return false;
}
return kill(pid_, signal) == 0;
}
private:
const std::vector<string> argv_;
pid_t pid_;
TF_DISALLOW_COPY_AND_ASSIGN(PosixSubProcess);
};
} // namespace
std::unique_ptr<SubProcess> CreateSubProcess(const std::vector<string>& argv) {
return std::unique_ptr<SubProcess>(new PosixSubProcess(argv));
std::unique_ptr<SubProcess> proc(new SubProcess());
proc->SetProgram(argv[0], argv);
proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT);
proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT);
return proc;
}
int PickUnusedPortOrDie() { return internal::PickUnusedPortOrDie(); }

View File

@ -0,0 +1,61 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_PLATFORM_SUBPROCESS_H_
#define TENSORFLOW_PLATFORM_SUBPROCESS_H_
namespace tensorflow {
// Channel identifiers.
enum Channel {
CHAN_STDIN = 0,
CHAN_STDOUT = 1,
CHAN_STDERR = 2,
};
// Specify how a channel is handled.
enum ChannelAction {
// Close the file descriptor when the process starts.
// This is the default behavior.
ACTION_CLOSE,
// Make a pipe to the channel. It is used in the Communicate() method to
// transfer data between the parent and child processes.
ACTION_PIPE,
// Duplicate the parent's file descriptor. Useful if stdout/stderr should
// go to the same place that the parent writes it.
ACTION_DUPPARENT,
};
// Supports spawning and killing child processes.
class SubProcess;
} // namespace tensorflow
#include "tensorflow/core/platform/platform.h"
#if defined(PLATFORM_GOOGLE)
#include "tensorflow/core/platform/google/subprocess.h"
#elif defined(PLATFORM_POSIX) || defined(PLATFORM_POSIX_ANDROID) || \
defined(PLATFORM_GOOGLE_ANDROID)
#include "tensorflow/core/platform/posix/subprocess.h"
#elif defined(PLATFORM_WINDOWS)
#error SubProcess not yet implemented for Windows
#else
#error Define the appropriate PLATFORM_<foo> macro for this platform
#endif
#endif // TENSORFLOW_PLATFORM_SUBPROCESS_H_

View File

@ -0,0 +1,184 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include <sys/wait.h>
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/platform/subprocess.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
class SubProcessTest : public ::testing::Test {};
TEST_F(SubProcessTest, NoOutputNoComm) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat", "/dev/null"});
EXPECT_TRUE(proc.Start());
EXPECT_TRUE(proc.Wait());
}
TEST_F(SubProcessTest, NoOutput) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat", "/dev/null"});
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
string out, err;
int status = proc.Communicate(nullptr, &out, &err);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
EXPECT_EQ("", out);
EXPECT_EQ("", err);
}
TEST_F(SubProcessTest, Stdout) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"});
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
string out, err;
int status = proc.Communicate(nullptr, &out, &err);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
EXPECT_EQ("hello world", out);
EXPECT_EQ("", err);
}
TEST_F(SubProcessTest, StdoutIgnored) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"});
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
int status = proc.Communicate(nullptr, nullptr, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
}
TEST_F(SubProcessTest, Stderr) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"});
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
string out, err;
int status = proc.Communicate(nullptr, &out, &err);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(1, WEXITSTATUS(status));
EXPECT_EQ("", out);
EXPECT_NE(string::npos, err.find("/file_does_not_exist"));
}
TEST_F(SubProcessTest, StderrIgnored) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"});
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
int status = proc.Communicate(nullptr, nullptr, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(1, WEXITSTATUS(status));
}
TEST_F(SubProcessTest, Stdin) {
tensorflow::SubProcess proc;
proc.SetProgram("/usr/bin/wc", {"wc", "-l"});
proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
string in = "foobar\nbarfoo\nhaha\n";
int status = proc.Communicate(&in, nullptr, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
}
TEST_F(SubProcessTest, StdinStdout) {
tensorflow::SubProcess proc;
proc.SetProgram("/usr/bin/wc", {"wc", "-l"});
proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
string in = "foobar\nbarfoo\nhaha\n";
string out;
int status = proc.Communicate(&in, &out, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
int count = stoi(out);
EXPECT_EQ(3, count);
}
TEST_F(SubProcessTest, StdinChildExit) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/sleep", {"sleep", "0"});
proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
// Verify that the parent handles the child exiting immediately as the
// parent is trying to write a large string to the child's stdin.
string in;
in.reserve(1000000);
for (int i = 0; i < 100000; i++) {
in += "hello xyz\n";
}
int status = proc.Communicate(&in, nullptr, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
}
TEST_F(SubProcessTest, StdinStdoutOverlap) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat"});
proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
// Verify that the parent handles multiplexed reading/writing to the child
// process. The string is large enough to exceed the buffering of the pipes.
string in;
in.reserve(1000000);
for (int i = 0; i < 100000; i++) {
in += "hello xyz\n";
}
string out;
int status = proc.Communicate(&in, &out, nullptr);
EXPECT_TRUE(WIFEXITED(status));
EXPECT_EQ(0, WEXITSTATUS(status));
EXPECT_EQ(in, out);
}
TEST_F(SubProcessTest, KillProc) {
tensorflow::SubProcess proc;
proc.SetProgram("/bin/cat", {"cat"});
proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE);
proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE);
EXPECT_TRUE(proc.Start());
EXPECT_TRUE(proc.Kill(SIGKILL));
EXPECT_TRUE(proc.Wait());
EXPECT_FALSE(proc.Kill(SIGKILL));
}
} // namespace tensorflow

View File

@ -21,6 +21,7 @@ limitations under the License.
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/platform.h"
#include "tensorflow/core/platform/subprocess.h"
#include "tensorflow/core/platform/types.h"
// As of September 2016, we continue to attempt to avoid the use of gmock aka
@ -48,28 +49,6 @@ string TensorFlowSrcRoot();
// Returns the same value for the lifetime of the process.
int RandomSeed();
// Supports spawning and killing child processes, for use in
// multi-process testing.
class SubProcess {
public:
virtual ~SubProcess() {}
// Starts the subprocess. Returns true on success, otherwise false.
// NOTE: This method is not thread-safe.
virtual bool Start() = 0;
// Kills the subprocess with the given signal number. Returns true
// on success, otherwise false.
// NOTE: This method is not thread-safe.
virtual bool Kill(int signal) = 0;
protected:
SubProcess() {}
private:
TF_DISALLOW_COPY_AND_ASSIGN(SubProcess);
};
// Returns an object that represents a child process that will be
// launched with the given command-line arguments `argv`. The process
// must be explicitly started by calling the Start() method on the