diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 455774bf038..a1054472e39 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -214,6 +214,7 @@ cc_library( "platform/protobuf.h", "platform/stacktrace.h", "platform/strong_hash.h", + "platform/subprocess.h", "platform/thread_annotations.h", "platform/types.h", ], @@ -1539,6 +1540,7 @@ tf_cc_tests( "platform/net_test.cc", "platform/port_test.cc", "platform/profile_utils/cpu_utils_test.cc", + "platform/subprocess_test.cc", ], deps = [ ":lib", diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h b/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h index cfaf4c0da99..5e81b901894 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_testlib.h @@ -60,7 +60,7 @@ class TestCluster { private: TestCluster() = default; - std::vector> subprocesses_; + std::vector> subprocesses_; std::vector targets_; std::vector devices_; diff --git a/tensorflow/core/platform/posix/subprocess.cc b/tensorflow/core/platform/posix/subprocess.cc new file mode 100644 index 00000000000..ef646baf4d5 --- /dev/null +++ b/tensorflow/core/platform/posix/subprocess.cc @@ -0,0 +1,464 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include +#include +#include +#include +#include +#include +#include + +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/subprocess.h" + +// 1) FYI from m3b@ about fork(): +// A danger of calling fork() (as opposed to clone() or vfork()) is that if +// many people have used pthread_atfork() to acquire locks, fork() can deadlock, +// because it's unlikely that the locking order will be correct in a large +// programme where different layers are unaware of one another and using +// pthread_atfork() independently. +// +// The danger of not calling fork() is that if libc managed to use +// pthread_atfork() correctly (for example, to lock the environment), you'd +// miss out on that protection. (But as far as I can see most libc's don't get +// that right; certainly glibc doesn't seem to.) +// +// clone() or vfork() are also frustrating because clone() exists only on Linux, +// and both clone(...CLONE_VM...) and vfork() have interesting issues around +// signals being delivered after the fork and before the exec. It may be +// possible to work around the latter by blocking all signals before the fork +// and unblocking them afterwards. +// +// Fortunately, most people haven't heard of pthread_atfork(). +// +// +// 2) FYI from m3b@ about execv(): +// The execv() call implicitly uses the libc global variable environ, which was +// copied by fork(), and that copy could have raced with a setenv() call in +// another thread, since libc implementations are usually not very careful about +// this. (glibc isn't careful, for example.) +// +// If this were inside libc, we could use locks or memory barriers to avoid the +// race, but as it is, I see nothing you can do. Even if you tried to copy the +// environment before the fork(), the copying could race with other threads +// calling setenv(). The good news is that few people call setenv(). +// +// Amusingly, the standard says of fork(): "...to avoid errors, the child +// process may only execute async-signal-safe operations until such time as one +// of the exec functions is called." Notice that execve() is listed as +// async-signal-safe, but execv() is not, and the difference is just the +// handling of the environment. + +namespace tensorflow { + +SubProcess::SubProcess(int nfds) + : running_(false), pid_(-1), exec_path_(nullptr), exec_argv_(nullptr) { + // The input 'nfds' parameter is currently ignored and the internal constant + // 'kNFds' is used to support the 3 channels (stdin, stdout, stderr). + for (int i = 0; i < kNFds; i++) { + action_[i] = ACTION_CLOSE; + parent_pipe_[i] = -1; + child_pipe_[i] = -1; + } +} + +SubProcess::~SubProcess() { + mutex_lock procLock(proc_mu_); + mutex_lock dataLock(data_mu_); + pid_ = -1; + running_ = false; + FreeArgs(); + ClosePipes(); +} + +void SubProcess::FreeArgs() { + free(exec_path_); + exec_path_ = nullptr; + + if (exec_argv_) { + for (char** p = exec_argv_; *p != nullptr; p++) { + free(*p); + } + delete[] exec_argv_; + exec_argv_ = nullptr; + } +} + +void SubProcess::ClosePipes() { + for (int i = 0; i < kNFds; i++) { + if (parent_pipe_[i] >= 0) { + close(parent_pipe_[i]); + parent_pipe_[i] = -1; + } + if (child_pipe_[i] >= 0) { + close(child_pipe_[i]); + child_pipe_[i] = -1; + } + } +} + +void SubProcess::SetProgram(const string& file, + const std::vector& argv) { + mutex_lock procLock(proc_mu_); + mutex_lock dataLock(data_mu_); + if (running_) { + LOG(FATAL) << "SetProgram called after the process was started."; + return; + } + + FreeArgs(); + exec_path_ = strdup(file.c_str()); + if (exec_path_ == nullptr) { + LOG(FATAL) << "SetProgram failed to allocate file string."; + return; + } + + int argc = argv.size(); + exec_argv_ = new char*[argc + 1]; + for (int i = 0; i < argc; i++) { + exec_argv_[i] = strdup(argv[i].c_str()); + if (exec_argv_[i] == nullptr) { + LOG(FATAL) << "SetProgram failed to allocate command argument."; + return; + } + } + exec_argv_[argc] = nullptr; +} + +void SubProcess::SetChannelAction(Channel chan, ChannelAction action) { + mutex_lock procLock(proc_mu_); + mutex_lock dataLock(data_mu_); + if (running_) { + LOG(FATAL) << "SetChannelAction called after the process was started."; + } else if (!chan_valid(chan)) { + LOG(FATAL) << "SetChannelAction called with invalid channel: " << chan; + } else if ((action != ACTION_CLOSE) && (action != ACTION_PIPE) && + (action != ACTION_DUPPARENT)) { + LOG(FATAL) << "SetChannelAction called with invalid action: " << action; + } else { + action_[chan] = action; + } +} + +bool SubProcess::Start() { + mutex_lock procLock(proc_mu_); + mutex_lock dataLock(data_mu_); + if (running_) { + LOG(ERROR) << "Start called after the process was started."; + return false; + } + if ((exec_path_ == nullptr) || (exec_argv_ == nullptr)) { + LOG(ERROR) << "Start called without setting a program."; + return false; + } + + // Create parent/child pipes for the specified channels and make the + // parent-side of the pipes non-blocking. + for (int i = 0; i < kNFds; i++) { + if (action_[i] == ACTION_PIPE) { + int pipe_fds[2]; + if (pipe(pipe_fds) < 0) { + LOG(ERROR) << "Start cannot create pipe: " << strerror(errno); + ClosePipes(); + return false; + } + // Handle the direction of the pipe (stdin vs stdout/err). + if (i == 0) { + parent_pipe_[i] = pipe_fds[1]; + child_pipe_[i] = pipe_fds[0]; + } else { + parent_pipe_[i] = pipe_fds[0]; + child_pipe_[i] = pipe_fds[1]; + } + + if (fcntl(parent_pipe_[i], F_SETFL, O_NONBLOCK) < 0) { + LOG(ERROR) << "Start cannot make pipe non-blocking: " + << strerror(errno); + ClosePipes(); + return false; + } + if (fcntl(parent_pipe_[i], F_SETFD, FD_CLOEXEC) < 0) { + LOG(ERROR) << "Start cannot make pipe close-on-exec: " + << strerror(errno); + ClosePipes(); + return false; + } + } + } + + // Start the child process and setup the file descriptors of both processes. + // See comment (1) in the header about issues with the use of fork(). + pid_ = fork(); + if (pid_ < 0) { + LOG(ERROR) << "Start cannot fork() child process: " << strerror(errno); + ClosePipes(); + return false; + } + + if (pid_ > 0) { + // Parent process: close the child-side pipes and return. + running_ = true; + for (int i = 0; i < kNFds; i++) { + if (child_pipe_[i] >= 0) { + close(child_pipe_[i]); + child_pipe_[i] = -1; + } + } + return true; + } + + // Child process: close parent-side pipes and channels marked for closing. + // For pipe channels, replace their file descriptors with the pipes. + int devnull_fd = -1; + for (int i = 0; i < kNFds; i++) { + if (parent_pipe_[i] >= 0) { + close(parent_pipe_[i]); + parent_pipe_[i] = -1; + } + + switch (action_[i]) { + case ACTION_DUPPARENT: + // Nothing to do, fork() took care of it. + break; + + case ACTION_PIPE: + while (dup2(child_pipe_[i], i) < 0) { + if (!retry(errno)) { + _exit(1); + } + } + close(child_pipe_[i]); + child_pipe_[i] = -1; + break; + + case ACTION_CLOSE: + default: + // Do not close stdin/out/err, instead redirect them to /dev/null so + // their file descriptors remain unavailable for reuse by open(), etc. + if (i <= CHAN_STDERR) { + if (devnull_fd < 0) { + while ((devnull_fd = open("/dev/null", O_RDWR, 0)) < 0) { + if (!retry(errno)) { + _exit(1); + } + } + } + while (dup2(devnull_fd, i) < 0) { + if (!retry(errno)) { + _exit(1); + } + } + } else { + close(i); + } + break; + } + } + + if (devnull_fd >= 0) { + close(devnull_fd); + } + + // Execute the child program. + // See comment (2) in the header about issues with the use of execv(). + execv(exec_path_, exec_argv_); + _exit(1); +} + +bool SubProcess::Wait() { + int status; + return WaitInternal(&status); +} + +bool SubProcess::WaitInternal(int* status) { + // The waiter must release proc_mu_ while waiting in order for Kill() to work. + proc_mu_.lock(); + bool running = running_; + pid_t pid = pid_; + proc_mu_.unlock(); + + bool ret = false; + if (running && (pid > 1)) { + pid_t cpid; + int cstat; + bool done = false; + while (!done) { + cpid = waitpid(pid, &cstat, 0); + if ((cpid < 0) && !retry(errno)) { + done = true; + } else if ((cpid == pid) && (WIFEXITED(cstat) || WIFSIGNALED(cstat))) { + *status = cstat; + ret = true; + done = true; + } + } + } + + proc_mu_.lock(); + if ((running_ == running) && (pid_ == pid)) { + running_ = false; + pid_ = -1; + } + proc_mu_.unlock(); + return ret; +} + +bool SubProcess::Kill(int signal) { + proc_mu_.lock(); + bool running = running_; + pid_t pid = pid_; + proc_mu_.unlock(); + + bool ret = false; + if (running && (pid > 1)) { + ret = (kill(pid, signal) == 0); + } + return ret; +} + +int SubProcess::Communicate(const string* stdin_input, string* stdout_output, + string* stderr_output) { + struct pollfd fds[kNFds]; + size_t nbytes[kNFds]; + string* iobufs[kNFds]; + int fd_count = 0; + + proc_mu_.lock(); + bool running = running_; + proc_mu_.unlock(); + if (!running) { + LOG(ERROR) << "Communicate called without a running process."; + return 1; + } + + // If SIGPIPE handling is the default action, change it to ignore SIGPIPE and + // keep it ignored, don't change it back. This is needed while communicating + // with the child process so the parent process can survive the death of the + // child process while it is writing to its stdin. If the application has + // registered a SIGPIPE handler, then let it deal with any signals generated + // by the premature death of the child process, don't overwrite its handler. + struct sigaction act; + if (sigaction(SIGPIPE, nullptr, &act) < 0) { + LOG(ERROR) << "Communicate cannot get SIGPIPE handler: " << strerror(errno); + return 1; + } + if (act.sa_handler == SIG_DFL) { + memset(&act, 0, sizeof(act)); + act.sa_handler = SIG_IGN; + sigemptyset(&act.sa_mask); + if (sigaction(SIGPIPE, &act, nullptr) < 0) { + LOG(ERROR) << "Communicate cannot ignore SIGPIPE: " << strerror(errno); + return 1; + } + } + + // Lock data_mu_ but not proc_mu_ while communicating with the child process + // in order for Kill() to be able to terminate the child from another thread. + data_mu_.lock(); + + // Initialize the poll() structures and buffer tracking. + for (int i = 0; i < kNFds; i++) { + if (action_[i] == ACTION_PIPE) { + switch (i) { + case CHAN_STDIN: + // Special case: if no data is given to send to the child process, + // close the pipe to unblock the child, and skip the file descriptor. + if (stdin_input == nullptr) { + close(parent_pipe_[i]); + parent_pipe_[i] = -1; + continue; + } + iobufs[fd_count] = const_cast(stdin_input); + break; + case CHAN_STDOUT: + iobufs[fd_count] = stdout_output; + break; + case CHAN_STDERR: + iobufs[fd_count] = stderr_output; + break; + default: + iobufs[fd_count] = nullptr; + break; + } + nbytes[fd_count] = 0; + fds[fd_count].fd = parent_pipe_[i]; + fds[fd_count].events = (i > 0) ? POLLIN : POLLOUT; + fds[fd_count].revents = 0; + fd_count++; + } + } + + // Loop communicating with the child process. + int fd_remain = fd_count; + char buf[4096]; + while (fd_remain > 0) { + int n = poll(fds, fd_count, -1); + if ((n < 0) && !retry(errno)) { + LOG(ERROR) << "Communicate cannot poll(): " << strerror(errno); + fd_remain = 0; + } else if (n == 0) { + LOG(ERROR) << "Communicate cannot poll(): timeout not possible"; + fd_remain = 0; + } else if (n > 0) { + // Handle the pipes ready for I/O. + for (int i = 0; i < fd_count; i++) { + if ((fds[i].revents & (POLLIN | POLLHUP)) != 0) { + // Read from one of the child's outputs. + ssize_t n = read(fds[i].fd, buf, sizeof(buf)); + if (n > 0) { + if (iobufs[i] != nullptr) { + iobufs[i]->append(buf, n); + nbytes[i] += n; + } + } else if ((n == 0) || !retry(errno)) { + fds[i].fd = -1; + fd_remain--; + } + } else if ((fds[i].revents & POLLOUT) != 0) { + // Write to the child's stdin. + ssize_t n = iobufs[i]->size() - nbytes[i]; + if (n > 0) { + n = write(fds[i].fd, iobufs[i]->c_str() + nbytes[i], n); + } + if (n >= 0) { + nbytes[i] += n; + if (nbytes[i] >= iobufs[i]->size()) { + fds[i].fd = -1; + fd_remain--; + // Close the child's stdin pipe to unblock the process. + close(parent_pipe_[CHAN_STDIN]); + parent_pipe_[CHAN_STDIN] = -1; + } + } else if (!retry(errno)) { + fds[i].fd = -1; + fd_remain--; + } + } else if ((fds[i].revents & POLLERR) != 0) { + fds[i].fd = -1; + fd_remain--; + } + } + } + } + + data_mu_.unlock(); + + // Wait for the child process to exit and return its status. + int status; + return WaitInternal(&status) ? status : -1; +} + +} // namespace tensorflow diff --git a/tensorflow/core/platform/posix/subprocess.h b/tensorflow/core/platform/posix/subprocess.h new file mode 100644 index 00000000000..53f95f3c14e --- /dev/null +++ b/tensorflow/core/platform/posix/subprocess.h @@ -0,0 +1,131 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_ +#define TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_ + +#include +#include + +#include +#include + +#include "tensorflow/core/platform/macros.h" +#include "tensorflow/core/platform/mutex.h" +#include "tensorflow/core/platform/types.h" + +namespace tensorflow { + +class SubProcess { + public: + // SubProcess() + // nfds: The number of file descriptors to use. + explicit SubProcess(int nfds = 3); + + // Virtual for backwards compatibility; do not create new subclasses. + // It is illegal to delete the SubProcess within its exit callback. + virtual ~SubProcess(); + + // SetChannelAction() + // Set how to handle a channel. The default action is ACTION_CLOSE. + // The action is set for all subsequent processes, until SetChannel() + // is called again. + // + // SetChannel may not be called while the process is running. + // + // chan: Which channel this applies to. + // action: What to do with the channel. + // Virtual for backwards compatibility; do not create new subclasses. + virtual void SetChannelAction(Channel chan, ChannelAction action); + + // SetProgram() + // Set up a program and argument list for execution, with the full + // "raw" argument list passed as a vector of strings. argv[0] + // should be the program name, just as in execv(). + // + // file: The file containing the program. This must be an absolute path + // name - $PATH is not searched. + // argv: The argument list. + virtual void SetProgram(const string& file, const std::vector& argv); + + // Start() + // Run the command that was previously set up with SetProgram(). + // The following are fatal programming errors: + // * Attempting to start when a process is already running. + // * Attempting to start without first setting the command. + // Note, however, that Start() does not try to validate that the binary + // does anything reasonable (e.g. exists or can execute); as such, you can + // specify a non-existent binary and Start() will still return true. You + // will get a failure from the process, but only after Start() returns. + // + // Return true normally, or false if the program couldn't be started + // because of some error. + // Virtual for backwards compatibility; do not create new subclasses. + virtual bool Start(); + + // Kill() + // Send the given signal to the process. + // Return true normally, or false if we couldn't send the signal - likely + // because the process doesn't exist. + virtual bool Kill(int signal); + + // Wait() + // Block until the process exits. + // Return true normally, or false if the process wasn't running. + virtual bool Wait(); + + // Communicate() + // Read from stdout and stderr and writes to stdin until all pipes have + // closed, then waits for the process to exit. + // Note: Do NOT call Wait() after calling Communicate as it will always + // fail, since Communicate calls Wait() internally. + // 'stdin_input', 'stdout_output', and 'stderr_output' may be NULL. + // If this process is not configured to send stdout or stderr to pipes, + // the output strings will not be modified. + // If this process is not configured to take stdin from a pipe, stdin_input + // will be ignored. + // Returns the command's exit status. + virtual int Communicate(const string* stdin_input, string* stdout_output, + string* stderr_output); + + private: + static const int kNFds = 3; + static bool chan_valid(int chan) { return ((chan >= 0) && (chan < kNFds)); } + static bool retry(int e) { + return ((e == EINTR) || (e == EAGAIN) || (e == EWOULDBLOCK)); + } + void FreeArgs() EXCLUSIVE_LOCKS_REQUIRED(data_mu_); + void ClosePipes() EXCLUSIVE_LOCKS_REQUIRED(data_mu_); + bool WaitInternal(int* status); + + // The separation between proc_mu_ and data_mu_ mutexes allows Kill() to be + // called by a thread while another thread is inside Wait() or Communicate(). + mutable mutex proc_mu_; + bool running_ GUARDED_BY(proc_mu_); + pid_t pid_ GUARDED_BY(proc_mu_); + + mutable mutex data_mu_ ACQUIRED_AFTER(proc_mu_); + char* exec_path_ GUARDED_BY(data_mu_); + char** exec_argv_ GUARDED_BY(data_mu_); + ChannelAction action_[kNFds] GUARDED_BY(data_mu_); + int parent_pipe_[kNFds] GUARDED_BY(data_mu_); + int child_pipe_[kNFds] GUARDED_BY(data_mu_); + + TF_DISALLOW_COPY_AND_ASSIGN(SubProcess); +}; + +} // namespace tensorflow + +#endif // TENSORFLOW_PLATFORM_DEFAULT_SUBPROCESS_H_ diff --git a/tensorflow/core/platform/posix/test.cc b/tensorflow/core/platform/posix/test.cc index f83fccaa227..a69127b3e88 100644 --- a/tensorflow/core/platform/posix/test.cc +++ b/tensorflow/core/platform/posix/test.cc @@ -20,62 +20,17 @@ limitations under the License. #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/subprocess.h" namespace tensorflow { namespace testing { -namespace { -class PosixSubProcess : public SubProcess { - public: - PosixSubProcess(const std::vector& argv) : argv_(argv), pid_(0) {} - - ~PosixSubProcess() override {} - - bool Start() override { - if (pid_ != 0) { - LOG(ERROR) << "Tried to start process multiple times."; - return false; - } - pid_ = fork(); - if (pid_ == 0) { - // We are in the child process. - const char* path = argv_[0].c_str(); - const char** argv = new const char*[argv_.size() + 1]; - int i = 0; - for (const string& arg : argv_) { - argv[i++] = arg.c_str(); - } - argv[argv_.size()] = nullptr; - execv(path, (char* const*)argv); - // Never executes. - return true; - } else if (pid_ < 0) { - LOG(ERROR) << "Failed to fork process."; - return false; - } else { - // We are in the parent process and fork() was successful. - // TODO(mrry): Consider collecting stderr from the child. - return true; - } - } - - bool Kill(int signal) override { - if (pid_ == 0) { - LOG(ERROR) << "Tried to kill process before starting it."; - return false; - } - return kill(pid_, signal) == 0; - } - - private: - const std::vector argv_; - pid_t pid_; - TF_DISALLOW_COPY_AND_ASSIGN(PosixSubProcess); -}; -} // namespace - std::unique_ptr CreateSubProcess(const std::vector& argv) { - return std::unique_ptr(new PosixSubProcess(argv)); + std::unique_ptr proc(new SubProcess()); + proc->SetProgram(argv[0], argv); + proc->SetChannelAction(CHAN_STDERR, ACTION_DUPPARENT); + proc->SetChannelAction(CHAN_STDOUT, ACTION_DUPPARENT); + return proc; } int PickUnusedPortOrDie() { return internal::PickUnusedPortOrDie(); } diff --git a/tensorflow/core/platform/subprocess.h b/tensorflow/core/platform/subprocess.h new file mode 100644 index 00000000000..7dfd38688d2 --- /dev/null +++ b/tensorflow/core/platform/subprocess.h @@ -0,0 +1,61 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_PLATFORM_SUBPROCESS_H_ +#define TENSORFLOW_PLATFORM_SUBPROCESS_H_ + +namespace tensorflow { + +// Channel identifiers. +enum Channel { + CHAN_STDIN = 0, + CHAN_STDOUT = 1, + CHAN_STDERR = 2, +}; + +// Specify how a channel is handled. +enum ChannelAction { + // Close the file descriptor when the process starts. + // This is the default behavior. + ACTION_CLOSE, + + // Make a pipe to the channel. It is used in the Communicate() method to + // transfer data between the parent and child processes. + ACTION_PIPE, + + // Duplicate the parent's file descriptor. Useful if stdout/stderr should + // go to the same place that the parent writes it. + ACTION_DUPPARENT, +}; + +// Supports spawning and killing child processes. +class SubProcess; + +} // namespace tensorflow + +#include "tensorflow/core/platform/platform.h" + +#if defined(PLATFORM_GOOGLE) +#include "tensorflow/core/platform/google/subprocess.h" +#elif defined(PLATFORM_POSIX) || defined(PLATFORM_POSIX_ANDROID) || \ + defined(PLATFORM_GOOGLE_ANDROID) +#include "tensorflow/core/platform/posix/subprocess.h" +#elif defined(PLATFORM_WINDOWS) +#error SubProcess not yet implemented for Windows +#else +#error Define the appropriate PLATFORM_ macro for this platform +#endif + +#endif // TENSORFLOW_PLATFORM_SUBPROCESS_H_ diff --git a/tensorflow/core/platform/subprocess_test.cc b/tensorflow/core/platform/subprocess_test.cc new file mode 100644 index 00000000000..3d58b011cb5 --- /dev/null +++ b/tensorflow/core/platform/subprocess_test.cc @@ -0,0 +1,184 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include + +#include "tensorflow/core/lib/core/status_test_util.h" +#include "tensorflow/core/platform/subprocess.h" +#include "tensorflow/core/platform/test.h" + +namespace tensorflow { + +class SubProcessTest : public ::testing::Test {}; + +TEST_F(SubProcessTest, NoOutputNoComm) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat", "/dev/null"}); + EXPECT_TRUE(proc.Start()); + EXPECT_TRUE(proc.Wait()); +} + +TEST_F(SubProcessTest, NoOutput) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat", "/dev/null"}); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + string out, err; + int status = proc.Communicate(nullptr, &out, &err); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + EXPECT_EQ("", out); + EXPECT_EQ("", err); +} + +TEST_F(SubProcessTest, Stdout) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"}); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + string out, err; + int status = proc.Communicate(nullptr, &out, &err); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + EXPECT_EQ("hello world", out); + EXPECT_EQ("", err); +} + +TEST_F(SubProcessTest, StdoutIgnored) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/echo", {"echo", "-n", "hello world"}); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + int status = proc.Communicate(nullptr, nullptr, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); +} + +TEST_F(SubProcessTest, Stderr) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"}); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + string out, err; + int status = proc.Communicate(nullptr, &out, &err); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(1, WEXITSTATUS(status)); + EXPECT_EQ("", out); + EXPECT_NE(string::npos, err.find("/file_does_not_exist")); +} + +TEST_F(SubProcessTest, StderrIgnored) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat", "/file_does_not_exist"}); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDERR, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + int status = proc.Communicate(nullptr, nullptr, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(1, WEXITSTATUS(status)); +} + +TEST_F(SubProcessTest, Stdin) { + tensorflow::SubProcess proc; + proc.SetProgram("/usr/bin/wc", {"wc", "-l"}); + proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + string in = "foobar\nbarfoo\nhaha\n"; + int status = proc.Communicate(&in, nullptr, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); +} + +TEST_F(SubProcessTest, StdinStdout) { + tensorflow::SubProcess proc; + proc.SetProgram("/usr/bin/wc", {"wc", "-l"}); + proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + string in = "foobar\nbarfoo\nhaha\n"; + string out; + int status = proc.Communicate(&in, &out, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + int count = stoi(out); + EXPECT_EQ(3, count); +} + +TEST_F(SubProcessTest, StdinChildExit) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/sleep", {"sleep", "0"}); + proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + // Verify that the parent handles the child exiting immediately as the + // parent is trying to write a large string to the child's stdin. + string in; + in.reserve(1000000); + for (int i = 0; i < 100000; i++) { + in += "hello xyz\n"; + } + + int status = proc.Communicate(&in, nullptr, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); +} + +TEST_F(SubProcessTest, StdinStdoutOverlap) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat"}); + proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + // Verify that the parent handles multiplexed reading/writing to the child + // process. The string is large enough to exceed the buffering of the pipes. + string in; + in.reserve(1000000); + for (int i = 0; i < 100000; i++) { + in += "hello xyz\n"; + } + + string out; + int status = proc.Communicate(&in, &out, nullptr); + EXPECT_TRUE(WIFEXITED(status)); + EXPECT_EQ(0, WEXITSTATUS(status)); + EXPECT_EQ(in, out); +} + +TEST_F(SubProcessTest, KillProc) { + tensorflow::SubProcess proc; + proc.SetProgram("/bin/cat", {"cat"}); + proc.SetChannelAction(CHAN_STDIN, ACTION_PIPE); + proc.SetChannelAction(CHAN_STDOUT, ACTION_PIPE); + EXPECT_TRUE(proc.Start()); + + EXPECT_TRUE(proc.Kill(SIGKILL)); + EXPECT_TRUE(proc.Wait()); + + EXPECT_FALSE(proc.Kill(SIGKILL)); +} + +} // namespace tensorflow diff --git a/tensorflow/core/platform/test.h b/tensorflow/core/platform/test.h index 0046b6c3f1d..295957c3d80 100644 --- a/tensorflow/core/platform/test.h +++ b/tensorflow/core/platform/test.h @@ -21,6 +21,7 @@ limitations under the License. #include "tensorflow/core/platform/macros.h" #include "tensorflow/core/platform/platform.h" +#include "tensorflow/core/platform/subprocess.h" #include "tensorflow/core/platform/types.h" // As of September 2016, we continue to attempt to avoid the use of gmock aka @@ -48,28 +49,6 @@ string TensorFlowSrcRoot(); // Returns the same value for the lifetime of the process. int RandomSeed(); -// Supports spawning and killing child processes, for use in -// multi-process testing. -class SubProcess { - public: - virtual ~SubProcess() {} - - // Starts the subprocess. Returns true on success, otherwise false. - // NOTE: This method is not thread-safe. - virtual bool Start() = 0; - - // Kills the subprocess with the given signal number. Returns true - // on success, otherwise false. - // NOTE: This method is not thread-safe. - virtual bool Kill(int signal) = 0; - - protected: - SubProcess() {} - - private: - TF_DISALLOW_COPY_AND_ASSIGN(SubProcess); -}; - // Returns an object that represents a child process that will be // launched with the given command-line arguments `argv`. The process // must be explicitly started by calling the Start() method on the