From 348b3e62245149e6ed59c3d856d9a723e4d8795a Mon Sep 17 00:00:00 2001 From: Allen Lavoie Date: Thu, 11 Jun 2020 15:28:29 -0700 Subject: [PATCH] Parallel device: switch to manual threading to avoid synchronization issues with remote eager Does not create new executors for each of its threads at the moment, so I'm not sure how this will interact with a user-level async setting. It'll be easy enough to create one sync executor per thread if that's a problem. Requires some tweaks to TFE_OpAddAttrs to make it safe to call for different ops in different threads (due to protobuf arena destruction being global and not thread-safe). These changes avoid serialization+deserialization of attributes, so they make sense anyway. PiperOrigin-RevId: 315990383 Change-Id: I211ceae1eb34e490b4d2a1df76226995c7ae7929 --- tensorflow/c/eager/c_api.cc | 6 +- .../parallel_device/parallel_device_lib.cc | 271 +++++++++++++----- .../parallel_device/parallel_device_lib.h | 26 +- .../core/common_runtime/eager/attr_builder.cc | 5 + .../core/common_runtime/eager/attr_builder.h | 11 + 5 files changed, 238 insertions(+), 81 deletions(-) diff --git a/tensorflow/c/eager/c_api.cc b/tensorflow/c/eager/c_api.cc index 865f8061ae0..e71073ec79f 100644 --- a/tensorflow/c/eager/c_api.cc +++ b/tensorflow/c/eager/c_api.cc @@ -1473,14 +1473,10 @@ const TFE_OpAttrs* TFE_OpGetAttrs(TFE_Op* op) { } void TFE_OpAddAttrs(TFE_Op* op, const TFE_OpAttrs* attrs) { - tensorflow::AttrValueMap m; - tensorflow::unwrap(attrs)->FillAttrValueMap(&m); tensorflow::EagerOperation* operation = OperationFromInterface(tensorflow::unwrap(op)); tensorflow::AttrBuilder* destination = operation->MutableAttrs(); - for (const auto& attribute : m) { - destination->Set(attribute.first, attribute.second); - } + destination->CopyAttributes(*tensorflow::unwrap(attrs)); } void TFE_OpAttrsSerialize(const TFE_OpAttrs* attrs, TF_Buffer* buf, diff --git a/tensorflow/c/eager/parallel_device/parallel_device_lib.cc b/tensorflow/c/eager/parallel_device/parallel_device_lib.cc index 225e9a3a12d..98cd4812610 100644 --- a/tensorflow/c/eager/parallel_device/parallel_device_lib.cc +++ b/tensorflow/c/eager/parallel_device/parallel_device_lib.cc @@ -16,6 +16,8 @@ limitations under the License. #include "tensorflow/c/eager/parallel_device/parallel_device_lib.h" #include "tensorflow/core/lib/gtl/cleanup.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" namespace tensorflow { namespace parallel_device { @@ -28,21 +30,198 @@ class OpDeleter { using OpPtr = std::unique_ptr; -// Creates a vector of `count` new executors (threads). -std::vector MakeExecutors(size_t count) { - std::vector executors; - executors.reserve(count); - for (int i = 0; i < count; ++i) { - executors.emplace_back(TFE_NewExecutor(true /* is_async */)); - } - return executors; -} +class StatusDeleter { + public: + void operator()(TF_Status* to_delete) const { TF_DeleteStatus(to_delete); } +}; + +using StatusPtr = std::unique_ptr; } // namespace +// Allows a single op at a time to be launched without blocking. +// +// DeviceThread itself is thread-safe, in that StartExecute will block if there +// is a pending execution. Since StartExecute is equivalent to grabbing a lock, +// multiple DeviceThreads should always be accessed in the same order to avoid +// deadlocks. +class DeviceThread { + public: + // Starts a background thread waiting for `StartExecute`. + explicit DeviceThread(const std::string& device) + : status_(TF_NewStatus()), + device_(device), + op_(nullptr), + thread_(tensorflow::Env::Default()->StartThread( + tensorflow::ThreadOptions(), "parallel_device_execute", + std::bind(&DeviceThread::Run, this))) {} + ~DeviceThread(); + + // Requests that the worker thread execute the specified operation. Blocks + // until the previously pending operation (a StartExecute without a Join) has + // finished, if any. + void StartExecute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, int expected_max_outputs); + // Block until the previous `StartExecute` operation has executed. Forwards + // the status from `TFE_Execute` and returns outputs if the status is OK. + std::vector Join(TF_Status* status); + + private: + void Run(); + + void Execute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, int expected_max_outputs, + std::vector* outputs, TF_Status* status) const + TF_EXCLUSIVE_LOCKS_REQUIRED(execution_mutex_); + + enum class ExecutionState { + kReadyToExecute, + kHasResult, + kIdle, + kShuttingDown, + }; + + tensorflow::mutex execution_mutex_; + ExecutionState execution_state_ TF_GUARDED_BY(execution_mutex_) = + ExecutionState::kIdle; + // Tells the worker thread that there is new work. + tensorflow::condition_variable start_execute_; + // The worker thread notifies that work has finished. + tensorflow::condition_variable finished_execute_; + // Notifies a StartExecute that the previous Join has finished. + tensorflow::condition_variable finished_join_; + + // Temporary state between `StartExecute` and `Join`. + // Inputs + TFE_Context* context_ TF_GUARDED_BY(execution_mutex_); + const char* operation_name_ TF_GUARDED_BY(execution_mutex_); + std::vector op_inputs_ TF_GUARDED_BY(execution_mutex_); + const TFE_OpAttrs* attributes_ TF_GUARDED_BY(execution_mutex_); + int expected_max_outputs_ TF_GUARDED_BY(execution_mutex_); + // Outputs + std::vector op_outputs_ TF_GUARDED_BY(execution_mutex_); + StatusPtr status_ TF_GUARDED_BY(execution_mutex_); + + const std::string device_; + mutable OpPtr op_ TF_GUARDED_BY(execution_mutex_); + std::unique_ptr thread_; +}; + +DeviceThread::~DeviceThread() { + { + tensorflow::mutex_lock l(execution_mutex_); + execution_state_ = ExecutionState::kShuttingDown; + } + start_execute_.notify_one(); +} + +void DeviceThread::Run() { + while (true) { + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ == ExecutionState::kIdle || + execution_state_ == ExecutionState::kHasResult) { + start_execute_.wait(l); + } + if (execution_state_ == ExecutionState::kShuttingDown) { + return; + } else if (execution_state_ == ExecutionState::kReadyToExecute) { + // op_outputs_ may have been std::moved + op_outputs_ = std::vector(); + Execute(context_, operation_name_, std::move(op_inputs_), attributes_, + expected_max_outputs_, &op_outputs_, status_.get()); + execution_state_ = ExecutionState::kHasResult; + } + } + finished_execute_.notify_one(); + } +} + +void DeviceThread::StartExecute(TFE_Context* context, + const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, + int expected_max_outputs) { + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ != ExecutionState::kIdle) { + // If there's already a pending execution, wait until Join finishes before + // starting on the next operation. + finished_join_.wait(l); + } + context_ = context; + operation_name_ = operation_name; + op_inputs_ = inputs; + attributes_ = attributes; + expected_max_outputs_ = expected_max_outputs; + execution_state_ = ExecutionState::kReadyToExecute; + } + start_execute_.notify_one(); +} + +std::vector DeviceThread::Join(TF_Status* status) { + std::vector result; + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ != ExecutionState::kHasResult) { + finished_execute_.wait(l); + } + if (TF_GetCode(status_.get()) != TF_OK) { + TF_SetStatus(status, TF_GetCode(status_.get()), + TF_Message(status_.get())); + } + execution_state_ = ExecutionState::kIdle; + result = std::move(op_outputs_); + } + finished_join_.notify_one(); + return result; +} + +void DeviceThread::Execute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, + int expected_max_outputs, + std::vector* outputs, + TF_Status* status) const { + if (op_ == nullptr) { + op_.reset(TFE_NewOp(context, operation_name, status)); + if (TF_GetCode(status) != TF_OK) return; + TFE_OpSetDevice(op_.get(), device_.c_str(), status); + if (TF_GetCode(status) != TF_OK) return; + } else { + TFE_OpReset(op_.get(), operation_name, device_.c_str(), status); + if (TF_GetCode(status) != TF_OK) return; + } + TFE_OpAddAttrs(op_.get(), attributes); + for (int input_index = 0; input_index < inputs.size(); ++input_index) { + TFE_OpAddInput(op_.get(), inputs[input_index], status); + if (TF_GetCode(status) != TF_OK) return; + } + std::vector unwrapped_results(expected_max_outputs); + int real_num_outputs = expected_max_outputs; + if (TF_GetCode(status) != TF_OK) return; + TFE_Execute(op_.get(), unwrapped_results.data(), &real_num_outputs, status); + if (TF_GetCode(status) != TF_OK) return; + unwrapped_results.resize(real_num_outputs); + outputs->reserve(real_num_outputs); + for (TFE_TensorHandle* unwrapped_result : unwrapped_results) { + outputs->emplace_back(unwrapped_result); + } +} + ParallelDevice::ParallelDevice(const std::vector& devices) - : underlying_devices_(devices), - executors_(MakeExecutors(underlying_devices_.size())) {} + : underlying_devices_(devices) { + device_threads_.reserve(devices.size()); + for (int device_index = 0; device_index < devices.size(); ++device_index) { + device_threads_.emplace_back( + new DeviceThread(devices[device_index].c_str())); + } +} + +// Necessary for a unique_ptr to a forward-declared type. +ParallelDevice::~ParallelDevice() = default; std::unique_ptr ParallelDevice::CopyToParallelDevice( TFE_Context* context, TFE_TensorHandle* tensor, TF_Status* status) const { @@ -108,72 +287,34 @@ ParallelDevice::Execute(TFE_Context* context, // Compute per-device per-output tensors std::vector> per_device_output_tensors; per_device_output_tensors.reserve(underlying_devices_.size()); - // TODO(allenl): Add a TFE_ExecuteWithExecutor API so we don't have to keep - // setting the thread-local executor like this. - TFE_Executor* previous_executor(TFE_ContextGetExecutorForThread(context)); - auto reset_executor = - tensorflow::gtl::MakeCleanup([context, previous_executor]() { - TFE_ContextSetExecutorForThread(context, previous_executor); - TFE_DeleteExecutor(previous_executor); - }); - int first_op_output_count; + int first_op_output_count = 0; for (int device_index = 0; device_index < underlying_devices_.size(); ++device_index) { - TFE_Executor* executor = executors_[device_index].get(); - // Note that the `reset_executor` cleanup sets the thread's executor back to - // the value before this function ran. - TFE_ContextSetExecutorForThread(context, executor); - OpPtr op(TFE_NewOp(context, operation_name, status)); - if (TF_GetCode(status) != TF_OK) return result; - TFE_OpSetDevice(op.get(), underlying_devices_[device_index].c_str(), - status); - TFE_OpAddAttrs(op.get(), attributes); + DeviceThread* device_thread = device_threads_[device_index].get(); + std::vector device_inputs; + device_inputs.reserve(device_inputs.size()); for (int input_index = 0; input_index < inputs.size(); ++input_index) { // Parallel tensors are divided between operations by device. - TFE_OpAddInput(op.get(), inputs[input_index]->tensor(device_index), - status); - if (TF_GetCode(status) != TF_OK) return result; + device_inputs.push_back(inputs[input_index]->tensor(device_index)); } - std::vector op_outputs(expected_max_outputs); - int real_num_outputs = expected_max_outputs; - // For nested devices, the inner device sees the async executor we've - // set. Inner parallel devices will just overwrite this with their own and - // then set it back to ours before returning. This means parallel devices - // which consist of several aliased parallel devices would hypothetically - // deadlock if the outer parallel device ran one collective with a group - // size equal to the total number of aliased physical devices. Currently - // physical devices cannot participate in a single collective reduction - // multiple times, so this would fail earlier. - // - // TODO(allenl): Keep a map from outer executor to list of inner executors - // rather than a single list of executors so aliased nested parallel devices - // don't re-use an executor. - TFE_Execute(op.get(), op_outputs.data(), &real_num_outputs, status); + device_thread->StartExecute(context, operation_name, + std::move(device_inputs), attributes, + expected_max_outputs); + } + for (int device_index = 0; device_index < underlying_devices_.size(); + ++device_index) { + DeviceThread* device_thread = device_threads_[device_index].get(); + per_device_output_tensors.push_back(device_thread->Join(status)); + if (TF_GetCode(status) != TF_OK) return result; if (device_index == 0) { - first_op_output_count = real_num_outputs; + first_op_output_count = per_device_output_tensors.rbegin()->size(); } else { - if (real_num_outputs != first_op_output_count) { + if (per_device_output_tensors.rbegin()->size() != first_op_output_count) { TF_SetStatus(status, TF_INTERNAL, "Parallel ops produced different numbers of tensors."); return result; } } - if (TF_GetCode(status) != TF_OK) return result; - std::vector this_outputs; - this_outputs.reserve(real_num_outputs); - for (int output_num = 0; output_num < real_num_outputs; ++output_num) { - this_outputs.emplace_back(op_outputs[output_num]); - } - per_device_output_tensors.push_back(std::move(this_outputs)); - } - for (int device_index = 0; device_index < underlying_devices_.size(); - ++device_index) { - TFE_Executor* executor = executors_[device_index].get(); - // TODO(b/157523095): Syncing the executor here shouldn't be - // necessary. Currently async+remote is missing cross-executor - // coordination. - TFE_ExecutorWaitForAllPendingNodes(executor, status); - if (TF_GetCode(status) != TF_OK) return result; } // For each output of the original operation, pack the per-device // TensorHandles we've computed into a single parallel TensorHandle. diff --git a/tensorflow/c/eager/parallel_device/parallel_device_lib.h b/tensorflow/c/eager/parallel_device/parallel_device_lib.h index 8f3f613e535..cbfea31d95f 100644 --- a/tensorflow/c/eager/parallel_device/parallel_device_lib.h +++ b/tensorflow/c/eager/parallel_device/parallel_device_lib.h @@ -41,16 +41,8 @@ class TensorHandleDeleter { using TensorHandlePtr = std::unique_ptr; -class ExecutorDeleter { - public: - void operator()(TFE_Executor* to_delete) const { - TFE_DeleteExecutor(to_delete); - } -}; - -using ExecutorPtr = std::unique_ptr; - class ParallelTensor; +class DeviceThread; // Forwards operations to `devices`, maintaining ParallelTensor with components // placed on each underlying device. @@ -58,6 +50,8 @@ class ParallelDevice { public: explicit ParallelDevice(const std::vector& devices); + ~ParallelDevice(); + // Helper to copy a tensor handle from another device once for each component // of the ParallelDevice. // @@ -94,9 +88,19 @@ class ParallelDevice { // A sequence of device names, indicating which devices replicated operations // are forwarded to. const std::vector underlying_devices_; - // A sequence of TFE_Executors, one per device, for executing operations in + // A sequence of thread wrappers, one per device, for executing operations in // parallel. - const std::vector executors_; + // + // Conceptually this is a thread pool with one thread per device. It requires + // less synchronization than a thread pool would for this task, since Execute + // acquires each thread in order (and so only one Execute will schedule + // blocking collective operations at a time), and avoids some dynamic + // allocation/scheduling. + // + // TODO(allenl): Keep a map from outer thread to list of inner threads rather + // than a single list of threads so aliased nested parallel devices don't + // re-use a thread. + std::vector> device_threads_; }; // Contains a tuple of tensors, one on each of the `underlying_devices_` of the diff --git a/tensorflow/core/common_runtime/eager/attr_builder.cc b/tensorflow/core/common_runtime/eager/attr_builder.cc index 9973dcc9051..2d85b2f764d 100644 --- a/tensorflow/core/common_runtime/eager/attr_builder.cc +++ b/tensorflow/core/common_runtime/eager/attr_builder.cc @@ -224,6 +224,11 @@ const NodeDef& AttrBuilder::BuildNodeDef() { return node_def_; } +void AttrBuilder::CopyAttributes(const AttrBuilder& other) { + encoded_attrs_.insert(other.encoded_attrs_.begin(), + other.encoded_attrs_.end()); +} + Status AttrTypeByName(const AttrTypeMap& m, const string& attr_name, TF_AttrType* out, unsigned char* is_list) { auto* t = gtl::FindOrNull(m, attr_name); diff --git a/tensorflow/core/common_runtime/eager/attr_builder.h b/tensorflow/core/common_runtime/eager/attr_builder.h index b2d38080102..38f6c737d8a 100644 --- a/tensorflow/core/common_runtime/eager/attr_builder.h +++ b/tensorflow/core/common_runtime/eager/attr_builder.h @@ -83,6 +83,11 @@ Status AttrTypeByName(const AttrTypeMap& m, const string& attr_name, // of the NodeDef till BuildNodeDef is called, or Set is called with certain // uncommon types (see template specializations of Set to see which types // trigger a NodeDef creation). +// +// Setting attributes via `Set` may cause arena-allocated protocol buffer +// messages to be destructed, which is not thread safe. This means that it is +// currently not safe to set attributes on *different* AttrBuilder objects from +// multiple threads. This does not apply to `CopyAttributes`. class AttrBuilder { public: AttrBuilder() {} @@ -150,6 +155,12 @@ class AttrBuilder { void FillAttrValueMapWithoutDefaults(AttrValueMap* m) const; const NodeDef& BuildNodeDef(); + // Transfers the attributes from `other` to this AttrBuilder. Does not + // overwrite existing attributes. Since it does not require deserializing and + // re-serializing attributes, it is much more efficient than going through an + // AttrValueMap. + void CopyAttributes(const AttrBuilder& other); + private: tensorflow::Fprint128 BuildCacheKeyForDevice(const StringPiece device) const;