diff --git a/tensorflow/c/eager/c_api.cc b/tensorflow/c/eager/c_api.cc index 865f8061ae0..e71073ec79f 100644 --- a/tensorflow/c/eager/c_api.cc +++ b/tensorflow/c/eager/c_api.cc @@ -1473,14 +1473,10 @@ const TFE_OpAttrs* TFE_OpGetAttrs(TFE_Op* op) { } void TFE_OpAddAttrs(TFE_Op* op, const TFE_OpAttrs* attrs) { - tensorflow::AttrValueMap m; - tensorflow::unwrap(attrs)->FillAttrValueMap(&m); tensorflow::EagerOperation* operation = OperationFromInterface(tensorflow::unwrap(op)); tensorflow::AttrBuilder* destination = operation->MutableAttrs(); - for (const auto& attribute : m) { - destination->Set(attribute.first, attribute.second); - } + destination->CopyAttributes(*tensorflow::unwrap(attrs)); } void TFE_OpAttrsSerialize(const TFE_OpAttrs* attrs, TF_Buffer* buf, diff --git a/tensorflow/c/eager/parallel_device/parallel_device_lib.cc b/tensorflow/c/eager/parallel_device/parallel_device_lib.cc index 225e9a3a12d..98cd4812610 100644 --- a/tensorflow/c/eager/parallel_device/parallel_device_lib.cc +++ b/tensorflow/c/eager/parallel_device/parallel_device_lib.cc @@ -16,6 +16,8 @@ limitations under the License. #include "tensorflow/c/eager/parallel_device/parallel_device_lib.h" #include "tensorflow/core/lib/gtl/cleanup.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/mutex.h" namespace tensorflow { namespace parallel_device { @@ -28,21 +30,198 @@ class OpDeleter { using OpPtr = std::unique_ptr; -// Creates a vector of `count` new executors (threads). -std::vector MakeExecutors(size_t count) { - std::vector executors; - executors.reserve(count); - for (int i = 0; i < count; ++i) { - executors.emplace_back(TFE_NewExecutor(true /* is_async */)); - } - return executors; -} +class StatusDeleter { + public: + void operator()(TF_Status* to_delete) const { TF_DeleteStatus(to_delete); } +}; + +using StatusPtr = std::unique_ptr; } // namespace +// Allows a single op at a time to be launched without blocking. +// +// DeviceThread itself is thread-safe, in that StartExecute will block if there +// is a pending execution. Since StartExecute is equivalent to grabbing a lock, +// multiple DeviceThreads should always be accessed in the same order to avoid +// deadlocks. +class DeviceThread { + public: + // Starts a background thread waiting for `StartExecute`. + explicit DeviceThread(const std::string& device) + : status_(TF_NewStatus()), + device_(device), + op_(nullptr), + thread_(tensorflow::Env::Default()->StartThread( + tensorflow::ThreadOptions(), "parallel_device_execute", + std::bind(&DeviceThread::Run, this))) {} + ~DeviceThread(); + + // Requests that the worker thread execute the specified operation. Blocks + // until the previously pending operation (a StartExecute without a Join) has + // finished, if any. + void StartExecute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, int expected_max_outputs); + // Block until the previous `StartExecute` operation has executed. Forwards + // the status from `TFE_Execute` and returns outputs if the status is OK. + std::vector Join(TF_Status* status); + + private: + void Run(); + + void Execute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, int expected_max_outputs, + std::vector* outputs, TF_Status* status) const + TF_EXCLUSIVE_LOCKS_REQUIRED(execution_mutex_); + + enum class ExecutionState { + kReadyToExecute, + kHasResult, + kIdle, + kShuttingDown, + }; + + tensorflow::mutex execution_mutex_; + ExecutionState execution_state_ TF_GUARDED_BY(execution_mutex_) = + ExecutionState::kIdle; + // Tells the worker thread that there is new work. + tensorflow::condition_variable start_execute_; + // The worker thread notifies that work has finished. + tensorflow::condition_variable finished_execute_; + // Notifies a StartExecute that the previous Join has finished. + tensorflow::condition_variable finished_join_; + + // Temporary state between `StartExecute` and `Join`. + // Inputs + TFE_Context* context_ TF_GUARDED_BY(execution_mutex_); + const char* operation_name_ TF_GUARDED_BY(execution_mutex_); + std::vector op_inputs_ TF_GUARDED_BY(execution_mutex_); + const TFE_OpAttrs* attributes_ TF_GUARDED_BY(execution_mutex_); + int expected_max_outputs_ TF_GUARDED_BY(execution_mutex_); + // Outputs + std::vector op_outputs_ TF_GUARDED_BY(execution_mutex_); + StatusPtr status_ TF_GUARDED_BY(execution_mutex_); + + const std::string device_; + mutable OpPtr op_ TF_GUARDED_BY(execution_mutex_); + std::unique_ptr thread_; +}; + +DeviceThread::~DeviceThread() { + { + tensorflow::mutex_lock l(execution_mutex_); + execution_state_ = ExecutionState::kShuttingDown; + } + start_execute_.notify_one(); +} + +void DeviceThread::Run() { + while (true) { + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ == ExecutionState::kIdle || + execution_state_ == ExecutionState::kHasResult) { + start_execute_.wait(l); + } + if (execution_state_ == ExecutionState::kShuttingDown) { + return; + } else if (execution_state_ == ExecutionState::kReadyToExecute) { + // op_outputs_ may have been std::moved + op_outputs_ = std::vector(); + Execute(context_, operation_name_, std::move(op_inputs_), attributes_, + expected_max_outputs_, &op_outputs_, status_.get()); + execution_state_ = ExecutionState::kHasResult; + } + } + finished_execute_.notify_one(); + } +} + +void DeviceThread::StartExecute(TFE_Context* context, + const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, + int expected_max_outputs) { + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ != ExecutionState::kIdle) { + // If there's already a pending execution, wait until Join finishes before + // starting on the next operation. + finished_join_.wait(l); + } + context_ = context; + operation_name_ = operation_name; + op_inputs_ = inputs; + attributes_ = attributes; + expected_max_outputs_ = expected_max_outputs; + execution_state_ = ExecutionState::kReadyToExecute; + } + start_execute_.notify_one(); +} + +std::vector DeviceThread::Join(TF_Status* status) { + std::vector result; + { + tensorflow::mutex_lock l(execution_mutex_); + while (execution_state_ != ExecutionState::kHasResult) { + finished_execute_.wait(l); + } + if (TF_GetCode(status_.get()) != TF_OK) { + TF_SetStatus(status, TF_GetCode(status_.get()), + TF_Message(status_.get())); + } + execution_state_ = ExecutionState::kIdle; + result = std::move(op_outputs_); + } + finished_join_.notify_one(); + return result; +} + +void DeviceThread::Execute(TFE_Context* context, const char* operation_name, + std::vector inputs, + const TFE_OpAttrs* attributes, + int expected_max_outputs, + std::vector* outputs, + TF_Status* status) const { + if (op_ == nullptr) { + op_.reset(TFE_NewOp(context, operation_name, status)); + if (TF_GetCode(status) != TF_OK) return; + TFE_OpSetDevice(op_.get(), device_.c_str(), status); + if (TF_GetCode(status) != TF_OK) return; + } else { + TFE_OpReset(op_.get(), operation_name, device_.c_str(), status); + if (TF_GetCode(status) != TF_OK) return; + } + TFE_OpAddAttrs(op_.get(), attributes); + for (int input_index = 0; input_index < inputs.size(); ++input_index) { + TFE_OpAddInput(op_.get(), inputs[input_index], status); + if (TF_GetCode(status) != TF_OK) return; + } + std::vector unwrapped_results(expected_max_outputs); + int real_num_outputs = expected_max_outputs; + if (TF_GetCode(status) != TF_OK) return; + TFE_Execute(op_.get(), unwrapped_results.data(), &real_num_outputs, status); + if (TF_GetCode(status) != TF_OK) return; + unwrapped_results.resize(real_num_outputs); + outputs->reserve(real_num_outputs); + for (TFE_TensorHandle* unwrapped_result : unwrapped_results) { + outputs->emplace_back(unwrapped_result); + } +} + ParallelDevice::ParallelDevice(const std::vector& devices) - : underlying_devices_(devices), - executors_(MakeExecutors(underlying_devices_.size())) {} + : underlying_devices_(devices) { + device_threads_.reserve(devices.size()); + for (int device_index = 0; device_index < devices.size(); ++device_index) { + device_threads_.emplace_back( + new DeviceThread(devices[device_index].c_str())); + } +} + +// Necessary for a unique_ptr to a forward-declared type. +ParallelDevice::~ParallelDevice() = default; std::unique_ptr ParallelDevice::CopyToParallelDevice( TFE_Context* context, TFE_TensorHandle* tensor, TF_Status* status) const { @@ -108,72 +287,34 @@ ParallelDevice::Execute(TFE_Context* context, // Compute per-device per-output tensors std::vector> per_device_output_tensors; per_device_output_tensors.reserve(underlying_devices_.size()); - // TODO(allenl): Add a TFE_ExecuteWithExecutor API so we don't have to keep - // setting the thread-local executor like this. - TFE_Executor* previous_executor(TFE_ContextGetExecutorForThread(context)); - auto reset_executor = - tensorflow::gtl::MakeCleanup([context, previous_executor]() { - TFE_ContextSetExecutorForThread(context, previous_executor); - TFE_DeleteExecutor(previous_executor); - }); - int first_op_output_count; + int first_op_output_count = 0; for (int device_index = 0; device_index < underlying_devices_.size(); ++device_index) { - TFE_Executor* executor = executors_[device_index].get(); - // Note that the `reset_executor` cleanup sets the thread's executor back to - // the value before this function ran. - TFE_ContextSetExecutorForThread(context, executor); - OpPtr op(TFE_NewOp(context, operation_name, status)); - if (TF_GetCode(status) != TF_OK) return result; - TFE_OpSetDevice(op.get(), underlying_devices_[device_index].c_str(), - status); - TFE_OpAddAttrs(op.get(), attributes); + DeviceThread* device_thread = device_threads_[device_index].get(); + std::vector device_inputs; + device_inputs.reserve(device_inputs.size()); for (int input_index = 0; input_index < inputs.size(); ++input_index) { // Parallel tensors are divided between operations by device. - TFE_OpAddInput(op.get(), inputs[input_index]->tensor(device_index), - status); - if (TF_GetCode(status) != TF_OK) return result; + device_inputs.push_back(inputs[input_index]->tensor(device_index)); } - std::vector op_outputs(expected_max_outputs); - int real_num_outputs = expected_max_outputs; - // For nested devices, the inner device sees the async executor we've - // set. Inner parallel devices will just overwrite this with their own and - // then set it back to ours before returning. This means parallel devices - // which consist of several aliased parallel devices would hypothetically - // deadlock if the outer parallel device ran one collective with a group - // size equal to the total number of aliased physical devices. Currently - // physical devices cannot participate in a single collective reduction - // multiple times, so this would fail earlier. - // - // TODO(allenl): Keep a map from outer executor to list of inner executors - // rather than a single list of executors so aliased nested parallel devices - // don't re-use an executor. - TFE_Execute(op.get(), op_outputs.data(), &real_num_outputs, status); + device_thread->StartExecute(context, operation_name, + std::move(device_inputs), attributes, + expected_max_outputs); + } + for (int device_index = 0; device_index < underlying_devices_.size(); + ++device_index) { + DeviceThread* device_thread = device_threads_[device_index].get(); + per_device_output_tensors.push_back(device_thread->Join(status)); + if (TF_GetCode(status) != TF_OK) return result; if (device_index == 0) { - first_op_output_count = real_num_outputs; + first_op_output_count = per_device_output_tensors.rbegin()->size(); } else { - if (real_num_outputs != first_op_output_count) { + if (per_device_output_tensors.rbegin()->size() != first_op_output_count) { TF_SetStatus(status, TF_INTERNAL, "Parallel ops produced different numbers of tensors."); return result; } } - if (TF_GetCode(status) != TF_OK) return result; - std::vector this_outputs; - this_outputs.reserve(real_num_outputs); - for (int output_num = 0; output_num < real_num_outputs; ++output_num) { - this_outputs.emplace_back(op_outputs[output_num]); - } - per_device_output_tensors.push_back(std::move(this_outputs)); - } - for (int device_index = 0; device_index < underlying_devices_.size(); - ++device_index) { - TFE_Executor* executor = executors_[device_index].get(); - // TODO(b/157523095): Syncing the executor here shouldn't be - // necessary. Currently async+remote is missing cross-executor - // coordination. - TFE_ExecutorWaitForAllPendingNodes(executor, status); - if (TF_GetCode(status) != TF_OK) return result; } // For each output of the original operation, pack the per-device // TensorHandles we've computed into a single parallel TensorHandle. diff --git a/tensorflow/c/eager/parallel_device/parallel_device_lib.h b/tensorflow/c/eager/parallel_device/parallel_device_lib.h index 8f3f613e535..cbfea31d95f 100644 --- a/tensorflow/c/eager/parallel_device/parallel_device_lib.h +++ b/tensorflow/c/eager/parallel_device/parallel_device_lib.h @@ -41,16 +41,8 @@ class TensorHandleDeleter { using TensorHandlePtr = std::unique_ptr; -class ExecutorDeleter { - public: - void operator()(TFE_Executor* to_delete) const { - TFE_DeleteExecutor(to_delete); - } -}; - -using ExecutorPtr = std::unique_ptr; - class ParallelTensor; +class DeviceThread; // Forwards operations to `devices`, maintaining ParallelTensor with components // placed on each underlying device. @@ -58,6 +50,8 @@ class ParallelDevice { public: explicit ParallelDevice(const std::vector& devices); + ~ParallelDevice(); + // Helper to copy a tensor handle from another device once for each component // of the ParallelDevice. // @@ -94,9 +88,19 @@ class ParallelDevice { // A sequence of device names, indicating which devices replicated operations // are forwarded to. const std::vector underlying_devices_; - // A sequence of TFE_Executors, one per device, for executing operations in + // A sequence of thread wrappers, one per device, for executing operations in // parallel. - const std::vector executors_; + // + // Conceptually this is a thread pool with one thread per device. It requires + // less synchronization than a thread pool would for this task, since Execute + // acquires each thread in order (and so only one Execute will schedule + // blocking collective operations at a time), and avoids some dynamic + // allocation/scheduling. + // + // TODO(allenl): Keep a map from outer thread to list of inner threads rather + // than a single list of threads so aliased nested parallel devices don't + // re-use a thread. + std::vector> device_threads_; }; // Contains a tuple of tensors, one on each of the `underlying_devices_` of the diff --git a/tensorflow/core/common_runtime/eager/attr_builder.cc b/tensorflow/core/common_runtime/eager/attr_builder.cc index 9973dcc9051..2d85b2f764d 100644 --- a/tensorflow/core/common_runtime/eager/attr_builder.cc +++ b/tensorflow/core/common_runtime/eager/attr_builder.cc @@ -224,6 +224,11 @@ const NodeDef& AttrBuilder::BuildNodeDef() { return node_def_; } +void AttrBuilder::CopyAttributes(const AttrBuilder& other) { + encoded_attrs_.insert(other.encoded_attrs_.begin(), + other.encoded_attrs_.end()); +} + Status AttrTypeByName(const AttrTypeMap& m, const string& attr_name, TF_AttrType* out, unsigned char* is_list) { auto* t = gtl::FindOrNull(m, attr_name); diff --git a/tensorflow/core/common_runtime/eager/attr_builder.h b/tensorflow/core/common_runtime/eager/attr_builder.h index b2d38080102..38f6c737d8a 100644 --- a/tensorflow/core/common_runtime/eager/attr_builder.h +++ b/tensorflow/core/common_runtime/eager/attr_builder.h @@ -83,6 +83,11 @@ Status AttrTypeByName(const AttrTypeMap& m, const string& attr_name, // of the NodeDef till BuildNodeDef is called, or Set is called with certain // uncommon types (see template specializations of Set to see which types // trigger a NodeDef creation). +// +// Setting attributes via `Set` may cause arena-allocated protocol buffer +// messages to be destructed, which is not thread safe. This means that it is +// currently not safe to set attributes on *different* AttrBuilder objects from +// multiple threads. This does not apply to `CopyAttributes`. class AttrBuilder { public: AttrBuilder() {} @@ -150,6 +155,12 @@ class AttrBuilder { void FillAttrValueMapWithoutDefaults(AttrValueMap* m) const; const NodeDef& BuildNodeDef(); + // Transfers the attributes from `other` to this AttrBuilder. Does not + // overwrite existing attributes. Since it does not require deserializing and + // re-serializing attributes, it is much more efficient than going through an + // AttrValueMap. + void CopyAttributes(const AttrBuilder& other); + private: tensorflow::Fprint128 BuildCacheKeyForDevice(const StringPiece device) const;