New Timestamped BFCAllocator and GPUKernelTracker.

The first part of this change extends BFCAllocator with an optional
timing counter for recording the time at which each Chunk is freed.
This has no effect for conventional memory management (as
applied to CPU RAM), but can achieve a new behavior when applied
to GPU RAM management.  The default TensorFlow memory allocation
convention for GPU RAM is to Unref the tensors Ref'd by a GPU Op as
soon as the Op has queued its kernel (and before that kernel is known
to have completed execution).  This is safe if the memory is
subsequently allocated to another GPU Op (the usual case) because that
second Op will be sequential on the single GPU compute stream and
hence won't touch the memory until the prior kernel has completed.
But this practice is unsafe if the memory is used for I/O or for an Op
queued on a different compute stream unless some further
synchronization is inserted.

Currently, I/O between a GPU and another device is made safe by
inserting stream dependencies.  Multi-compute-stream computation is
made safe by delaying the Unref of Ref'd tensors until the kernel is
known to have completed, via callback through the GPU-specific
EventMgr.  RDMA networking using GPUDirect is another difficult case
where stream synchronization is not possible and it is necessary to
wait until kernels are known to have completed before allowing
reallocation of the used memory.

Simply delaying the deallocation of memory until kernels are known to
have completed is unsatisfactory because it substantially raises the
high-water memory requirements of a program, drastically affecting the
model architectures that are feasible on a particular GPU model.  The
new freed-at count on BFCAllocator::Chunk is part of a strategy
for maintaining the high-water size efficiency of our current
single-compute-stream GPU memory allocation strategy while reducing
synchronization stalls in I/O uses of GPU RAM.  In the future it
may also be applied to multi-compute-stream execution.

The key idea is that when a request to allocate GPU memory is made we
can also pass along a 'freed-by' count and the allocator is free
to return any Chunk whose freed_count is <= that threshold.
This way we can continue to early-allocate GPU RAM without
restrictions to GPU kernels to be executed on a single compute stream,
while simultaneously satisfying the correctness constraints
needed for off-stream use.

GPUKernelTracker is the other component needed to make this new
strategy work.  It keeps track of the stream queuing and real
completion times of GPU kernels thus making it possible to pick the
largest safe freed-by count when making a request for GPU memory
that must be unemcumbered by other uses immediately.  A secondary
capability of the GPUKernelTracker is that it enables capping the
number of GPU kernels queued on a stream.  Without this cap some TF
models can experience moments when hundreds of kernels are queued on
the single compute stream.  Those queued but-not-executing kernels can
tie up memory that could be used for other purposes before its really
needed, and can delay I/O operations which are queued later and need
to wait for the compute stream to clear, for safety.

The new timestamped memory allocation strategy and pending-kernel
capping are considered experimental features and default off for
now, until more experience is gained.

PiperOrigin-RevId: 232705088
This commit is contained in:
A. Unique TensorFlower 2019-02-06 10:56:19 -08:00 committed by TensorFlower Gardener
parent 4e928af59b
commit 2dcb0a07c8
16 changed files with 487 additions and 37 deletions

View File

@ -2926,6 +2926,7 @@ tf_cuda_library(
CORE_CPU_LIB_HEADERS = CORE_CPU_BASE_HDRS + [
"common_runtime/allocator_retry.h",
"common_runtime/shared_counter.h",
"common_runtime/base_collective_executor.h",
"common_runtime/bfc_allocator.h",
"common_runtime/hierarchical_tree_broadcaster.h",

View File

@ -18,6 +18,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/bfc_allocator.h"
#include "tensorflow/core/common_runtime/allocator_retry.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/lib/core/bits.h"
#include "tensorflow/core/lib/gtl/stl_util.h"
#include "tensorflow/core/lib/strings/numbers.h"
@ -152,6 +153,7 @@ bool BFCAllocator::Extend(size_t alignment, size_t rounded_bytes) {
c->allocation_id = -1;
c->prev = kInvalidChunkHandle;
c->next = kInvalidChunkHandle;
c->freed_count = 0;
region_manager_.set_handle(c->ptr, h);
@ -180,29 +182,46 @@ void BFCAllocator::DeallocateChunk(ChunkHandle h) {
free_chunks_list_ = h;
}
void* BFCAllocator::AllocateRaw(size_t unused_alignment, size_t num_bytes) {
void* BFCAllocator::AllocateRawInternalWithRetry(
size_t unused_alignment, size_t num_bytes,
const AllocationAttributes& allocation_attr) {
// Fast path: Try once to allocate without getting the retry_helper_ involved
void* r = AllocateRawInternal(unused_alignment, num_bytes, false);
uint64 freed_by_count = 0;
if (allocation_attr.freed_by_func != nullptr) {
freed_by_count = allocation_attr.freed_by_func();
}
void* r =
AllocateRawInternal(unused_alignment, num_bytes, false, freed_by_count);
if (r != nullptr) {
return r;
} else {
static const int64 kMaxMillisToWait = 10000; // 10 seconds
return retry_helper_.AllocateRaw(
[this](size_t a, size_t nb, bool v) {
return AllocateRawInternal(a, nb, v);
r = retry_helper_.AllocateRaw(
[this, &allocation_attr](size_t a, size_t nb, bool v) {
uint64 freed_by_count = 0;
if (allocation_attr.freed_by_func != nullptr) {
freed_by_count = allocation_attr.freed_by_func();
}
return AllocateRawInternal(a, nb, v, freed_by_count);
},
kMaxMillisToWait, unused_alignment, num_bytes);
return r;
}
}
void* BFCAllocator::AllocateRaw(size_t unused_alignment, size_t num_bytes,
const AllocationAttributes& allocation_attr) {
VLOG(1) << "AllocateRaw " << Name() << " " << num_bytes;
if (allocation_attr.no_retry_on_failure) {
// Return immediately upon the first failure if this is for allocating an
// optional scratch space.
bool dump_log_on_failure = VLOG_IS_ON(2);
void* result =
AllocateRawInternal(unused_alignment, num_bytes, dump_log_on_failure);
uint64 freed_by_count = 0;
if (allocation_attr.freed_by_func != nullptr) {
freed_by_count = allocation_attr.freed_by_func();
}
void* result = AllocateRawInternal(unused_alignment, num_bytes,
dump_log_on_failure, freed_by_count);
if (result == nullptr) {
static std::atomic<int32> log_counter{0};
int32 counter_value = log_counter.load(std::memory_order_relaxed);
@ -218,7 +237,8 @@ void* BFCAllocator::AllocateRaw(size_t unused_alignment, size_t num_bytes,
}
return result;
} else {
return AllocateRaw(unused_alignment, num_bytes);
return AllocateRawInternalWithRetry(unused_alignment, num_bytes,
allocation_attr);
}
}
@ -233,7 +253,8 @@ size_t BFCAllocator::RoundedBytes(size_t bytes) {
void* BFCAllocator::AllocateRawInternal(size_t unused_alignment,
size_t num_bytes,
bool dump_log_on_failure) {
bool dump_log_on_failure,
uint64 freed_before) {
if (num_bytes == 0) {
LOG(ERROR) << "tried to allocate 0 bytes";
return nullptr;
@ -247,14 +268,14 @@ void* BFCAllocator::AllocateRawInternal(size_t unused_alignment,
BinNum bin_num = BinNumForSize(rounded_bytes);
mutex_lock l(lock_);
void* ptr = FindChunkPtr(bin_num, rounded_bytes, num_bytes);
void* ptr = FindChunkPtr(bin_num, rounded_bytes, num_bytes, freed_before);
if (ptr != nullptr) {
return ptr;
}
// Try to extend
if (Extend(unused_alignment, rounded_bytes)) {
ptr = FindChunkPtr(bin_num, rounded_bytes, num_bytes);
ptr = FindChunkPtr(bin_num, rounded_bytes, num_bytes, freed_before);
if (ptr != nullptr) {
return ptr;
}
@ -274,7 +295,7 @@ void* BFCAllocator::AllocateRawInternal(size_t unused_alignment,
}
void* BFCAllocator::FindChunkPtr(BinNum bin_num, size_t rounded_bytes,
size_t num_bytes) {
size_t num_bytes, uint64 freed_before) {
// First identify the first bin that could satisfy rounded_bytes.
for (; bin_num < kNumBins; bin_num++) {
// Start searching from the first bin for the smallest chunk that fits
@ -285,6 +306,9 @@ void* BFCAllocator::FindChunkPtr(BinNum bin_num, size_t rounded_bytes,
const BFCAllocator::ChunkHandle h = (*citer);
BFCAllocator::Chunk* chunk = ChunkFromHandle(h);
DCHECK(!chunk->in_use());
if (freed_before > 0 && freed_before < chunk->freed_count) {
continue;
}
if (chunk->size >= rounded_bytes) {
// We found an existing chunk that fits us that wasn't in use, so remove
// it from the free bin structure prior to using.
@ -347,6 +371,9 @@ void BFCAllocator::SplitChunk(BFCAllocator::ChunkHandle h, size_t num_bytes) {
// The new chunk is not in use.
new_chunk->allocation_id = -1;
// It inherits the freed time.
new_chunk->freed_count = c->freed_count;
// Maintain the pointers.
// c <-> c_neighbor becomes
// c <-> new_chunk <-> c_neighbor
@ -415,6 +442,9 @@ void BFCAllocator::Merge(BFCAllocator::ChunkHandle h1,
// Set the new size
c1->size += c2->size;
// Pick latest free time.
c1->freed_count = std::max(c1->freed_count, c2->freed_count);
DeleteChunk(h2);
}
@ -460,6 +490,11 @@ void BFCAllocator::FreeAndMaybeCoalesce(BFCAllocator::ChunkHandle h) {
// Mark the chunk as no longer in use.
c->allocation_id = -1;
// Optionally record the free time.
if (timing_counter_) {
c->freed_count = timing_counter_->next();
}
// Updates the stats.
stats_.bytes_in_use -= c->size;
@ -630,7 +665,10 @@ void BFCAllocator::DumpMemoryLog(size_t num_bytes) {
in_use_by_size[c->size]++;
}
LOG(INFO) << (c->in_use() ? "Chunk" : "Free ") << " at " << c->ptr
<< " of size " << c->size;
<< " of size " << c->size
<< (timing_counter_
? strings::StrCat(" freed_count ", c->freed_count)
: "");
h = c->next;
}
}

View File

@ -23,6 +23,7 @@ limitations under the License.
#include <vector>
#include "tensorflow/core/common_runtime/allocator_retry.h"
#include "tensorflow/core/common_runtime/shared_counter.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/lib/gtl/stl_util.h"
#include "tensorflow/core/lib/strings/strcat.h"
@ -50,9 +51,14 @@ class BFCAllocator : public Allocator {
~BFCAllocator() override;
string Name() override { return name_; }
void* AllocateRaw(size_t alignment, size_t num_bytes) override;
void* AllocateRaw(size_t alignment, size_t num_bytes) override {
return AllocateRaw(alignment, num_bytes, AllocationAttributes());
}
void* AllocateRaw(size_t alignment, size_t num_bytes,
const AllocationAttributes& allocation_attr) override;
void DeallocateRaw(void* ptr) override;
bool TracksAllocationSizes() override;
@ -67,11 +73,19 @@ class BFCAllocator : public Allocator {
void ClearStats() override;
void SetTimingCounter(SharedCounter* sc) { timing_counter_ = sc; }
private:
struct Bin;
void* AllocateRawInternal(size_t alignment, size_t num_bytes,
bool dump_log_on_failure);
bool dump_log_on_failure,
uint64 freed_before_count);
void* AllocateRawInternalWithRetry(
size_t alignment, size_t num_bytes,
const AllocationAttributes& allocation_attr);
void DeallocateRawInternal(void* ptr);
// A ChunkHandle is an index into the chunks_ vector in BFCAllocator
@ -126,6 +140,9 @@ class BFCAllocator : public Allocator {
// What bin are we in?
BinNum bin_num = kInvalidBinNum;
// Optional count when this chunk was most recently made free.
uint64 freed_count = 0;
bool in_use() const { return allocation_id != -1; }
string DebugString(BFCAllocator* a,
@ -314,8 +331,8 @@ class BFCAllocator : public Allocator {
// Returns a pointer to an underlying allocated chunk of size
// 'rounded_bytes'.
void* FindChunkPtr(BinNum bin_num, size_t rounded_bytes, size_t num_bytes)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
void* FindChunkPtr(BinNum bin_num, size_t rounded_bytes, size_t num_bytes,
uint64 freed_before) EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Splits the chunk specified by 'h' into two chunks, one at least
// of size 'num_bytes'.
@ -420,6 +437,7 @@ class BFCAllocator : public Allocator {
std::unique_ptr<SubAllocator> sub_allocator_;
string name_;
SharedCounter* timing_counter_ = nullptr;
// Structures mutable after construction
mutable mutex lock_;

View File

@ -276,6 +276,28 @@ BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
sync_every_op_(sync_every_op),
max_streams_(max_streams) {
GPUProcessState::singleton()->EnableGPUDevice();
pending_cap_ = options.config.gpu_options().experimental().pending_cap();
timestamped_allocator_ =
options.config.gpu_options().experimental().timestamped_allocator();
if (timestamped_allocator_ || pending_cap_ > 0) {
std::unique_ptr<SharedCounter> timing_counter;
if (timestamped_allocator_) {
// In this case the SharedCounter was already created and set in the
// associated Allocator, with ownership by GPUProcessState. Here we take
// over ownership of that SharedAllocator to transfer it to the
// GPUKernelTracker.
timing_counter =
GPUProcessState::singleton()->ReleaseGPUAllocatorCounter(tf_gpu_id);
DCHECK(timing_counter.get());
} else {
DCHECK_GT(pending_cap_, 0);
// In this case we need a SharedCounter to be owned by GPUKernelTracker
// but one was not created for use by the Allocator, so we create one.
timing_counter.reset(new SharedCounter);
}
kernel_tracker_.reset(
new GPUKernelTracker(Env::Default(), std::move(timing_counter)));
}
}
BaseGPUDevice::~BaseGPUDevice() {
@ -508,6 +530,10 @@ void BaseGPUDevice::ComputeHelper(OpKernel* op_kernel,
if (idc->stream() != stream) stream->ThenWaitFor(idc->stream());
}
}
if (pending_cap_ > 0) {
DCHECK(kernel_tracker_);
kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
}
se::cuda::ScopedActivateExecutorContext scoped_activation{stream->parent()};
op_kernel->Compute(context);
if (context->status().ok()) {
@ -525,6 +551,14 @@ void BaseGPUDevice::ComputeHelper(OpKernel* op_kernel,
VLOG(1) << "GpuDevice::ComputeHelper scheduled "
<< ComputeOpKernelDebugString(*op_kernel, stream_id);
}
if (kernel_tracker_) {
GPUKernelTracker* tracker = kernel_tracker_.get();
DCHECK(tracker);
uint64 queued_count = tracker->RecordQueued();
em_->ThenExecute(stream, [op_kernel, tracker, queued_count]() {
tracker->RecordTerminated(queued_count);
});
}
} else {
if (vlog_1) {
VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
@ -721,8 +755,8 @@ Status ParseVisibleDeviceList(const string& visible_device_list,
if (!strings::safe_strto32(platform_gpu_id_str, &platform_gpu_id)) {
return errors::InvalidArgument(
"Could not parse entry in 'visible_device_list': '",
platform_gpu_id_str, "'. visible_device_list = ",
visible_device_list);
platform_gpu_id_str,
"'. visible_device_list = ", visible_device_list);
}
if (platform_gpu_id < 0 ||
platform_gpu_id >= gpu_manager->VisibleDeviceCount()) {
@ -957,15 +991,15 @@ Status BaseGPUDeviceFactory::CreateDevices(
for (PlatformGpuId platform_gpu_id : valid_platform_gpu_ids) {
err = cudaSetDevice(platform_gpu_id.value());
if (err != cudaSuccess) {
return errors::Internal("cudaSetDevice() on GPU:",
platform_gpu_id.value(), " failed. Status: ",
cudaGetErrorString(err));
return errors::Internal(
"cudaSetDevice() on GPU:", platform_gpu_id.value(),
" failed. Status: ", cudaGetErrorString(err));
}
err = cudaFree(nullptr);
if (err != cudaSuccess) {
return errors::Internal("CUDA runtime implicit initialization on GPU:",
platform_gpu_id.value(), " failed. Status: ",
cudaGetErrorString(err));
platform_gpu_id.value(),
" failed. Status: ", cudaGetErrorString(err));
}
}
// Reset to the original device.
@ -1517,6 +1551,115 @@ Status BaseGPUDeviceFactory::GetValidDeviceIds(
return Status::OK();
}
uint64 BaseGPUDevice::SafeAllocFrontier() {
if (timestamped_allocator_) {
return kernel_tracker_->LastTerminatedCount();
} else {
return 0;
}
}
int BaseGPUDevice::PendingKernels() {
if (kernel_tracker_) {
return kernel_tracker_->NumPending();
}
return 0;
}
uint64 GPUKernelTracker::RecordQueued() {
mutex_lock l(mu_);
uint64 queued_count = timing_counter_->next();
VLOG(2) << "RecordQueued queued_count=" << queued_count
<< " first_available_=" << first_available_
<< " last_completed_=" << last_completed_
<< " num_pending_=" << num_pending_;
pending_kernels_[first_available_].queued_count = queued_count;
pending_kernels_[first_available_].terminated = false;
++first_available_;
++num_pending_;
if (first_available_ >= pending_kernels_.size()) {
first_available_ = 0;
}
if (first_available_ == last_completed_) {
// Ring buffer is full: double it. All of the same valid PendingKernel
// entries exist after the copy, they are just shifted to begin
// at index 0 in the new array.
std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
for (int i = 0; i < pending_kernels_.size(); ++i) {
int j = (i + last_completed_) % pending_kernels_.size();
new_buffer[i] = pending_kernels_[j];
}
last_completed_ = 0;
first_available_ = pending_kernels_.size();
pending_kernels_.swap(new_buffer);
VLOG(1) << "last_completed_=" << last_completed_
<< " first_available_=" << first_available_
<< " num_pending_=" << num_pending_;
}
DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
return queued_count;
}
void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
mutex_lock l(mu_);
VLOG(2) << "RecordTerminated queued_count=" << queued_count
<< " first_available_=" << first_available_
<< " last_completed_=" << last_completed_
<< " num_pending_=" << num_pending_ << " LC="
<< ((last_completed_ >= 0)
? pending_kernels_[last_completed_].queued_count
: -1);
DCHECK_NE(first_available_, last_completed_);
DCHECK_GT(num_pending_, 0);
// Starting just past the last completed entry, find the entry with
// this queued_count and mark it done.
int index = (last_completed_ + 1) % pending_kernels_.size();
while (true) {
if (index == first_available_) {
// This should never happen.
LOG(FATAL) << "Failed to find " << queued_count // Crash OK
<< " in queue";
}
if (pending_kernels_[index].queued_count == queued_count) {
pending_kernels_[index].terminated = true;
break;
}
index = (index + 1) % pending_kernels_.size();
}
// Next move last_completed_ forward past all completed kernels. In theory
// kernels should always complete in queued order so we should be able to
// advance the completed frontier to the last queued PendingKernel. In
// practice we occassionally see the termination callbacks arrive out of order
// probably because of thread scheduling. Eventually we may support out-of-
// order completion involving multple compute streams so here we follow a
// conservative approach and wait for every single callback to arrive before
// advancing the frontier.
while (true) {
int next_index = (last_completed_ + 1) % pending_kernels_.size();
if (next_index == first_available_) break;
if (pending_kernels_[next_index].terminated) {
last_completed_ = next_index;
} else {
break;
}
}
// Last decrease num_pending before maybe waking a waiter.
--num_pending_;
pending_decreased_.notify_one();
}
uint64 GPUKernelTracker::LastTerminatedCount() {
mutex_lock l(mu_);
if (last_completed_ < 0) {
// This is an edge case that can be encountered only at the beginning of
// execution. There's not yet a safe threshold count. We don't want to
// return 0 since that bypasses the count mechanism in BFCAllocator, so
// return the least non-zero value.
return 1;
}
return pending_kernels_[last_completed_].queued_count;
}
} // namespace tensorflow
#endif // GOOGLE_CUDA

View File

@ -34,6 +34,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/gpu_device_context.h"
#include "tensorflow/core/common_runtime/local_device.h"
#include "tensorflow/core/common_runtime/scoped_allocator_mgr.h"
#include "tensorflow/core/common_runtime/shared_counter.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/framework/device_base.h"
#include "tensorflow/core/framework/op_kernel.h"
@ -46,6 +47,7 @@ limitations under the License.
#include "tensorflow/core/public/session_options.h"
namespace tensorflow {
class GPUKernelTracker;
class BaseGPUDevice : public LocalDevice {
public:
@ -114,6 +116,17 @@ class BaseGPUDevice : public LocalDevice {
return scoped_allocator_mgr_.get();
}
// The following two functions always return 0 unless one of the
// related experimental config options has been specified.
// If returned value is > 0 then GPU Memory chunks freed before this count
// are guaranteed not to be in use by any kernel pending on this device.
uint64 SafeAllocFrontier() override;
// Returns the number of kernels that have been queued for execution on
// the compute stream and are not yet known to have completed.
int PendingKernels();
protected:
Allocator* gpu_allocator_; // not owned
Allocator* cpu_allocator_; // not owned
@ -141,6 +154,9 @@ class BaseGPUDevice : public LocalDevice {
const int32 max_streams_;
std::unique_ptr<EventMgr> em_;
std::unique_ptr<thread::ThreadPool> thread_pool_;
std::unique_ptr<GPUKernelTracker> kernel_tracker_;
int pending_cap_ = 0;
bool timestamped_allocator_ = false;
// Initialize scractch buffers used by Eigen.
Status InitScratchBuffers();
@ -163,6 +179,75 @@ class BaseGPUDevice : public LocalDevice {
StatusCallback done);
};
// A per-compute-stream utility that keeps track of kernels that have been
// queued for execution but may not yet have terminated, and also the queued
// time of the most recently terminated kernel.
class GPUKernelTracker {
public:
explicit GPUKernelTracker(Env* env,
std::unique_ptr<SharedCounter> timing_counter)
: env_(env),
timing_counter_(std::move(timing_counter)),
pending_kernels_(64) {}
// Record that a GPU kernel has just been enqueued on the compute stream.
// Inserts a new timing counter value in a new PendingKernel record appended
// to the end of the ring buffer then returns that same count.
uint64 RecordQueued();
// Takes a count value returned by RecordQueued and finds the corresponding
// PendingKernel record in the ring buffer. Marks the kernel as completed and
// advances the completion frontier accordingly.
void RecordTerminated(uint64 at_count);
// Returns the largest timing count such that all kernels queued no
// later than that count are known to have terminated.
uint64 LastTerminatedCount();
// Returns the number of kernels enqueued that are not yet known to
// have terminated.
int NumPending() {
mutex_lock l(mu_);
return num_pending_;
}
// Yield current thread until number of pending kernels no longer
// exceeds the cap.
void PauseWhilePendingExceeds(int cap) {
mutex_lock l(mu_);
while (num_pending_ > cap) {
pending_decreased_.wait(l);
}
}
private:
Env* env_;
std::unique_ptr<SharedCounter> timing_counter_;
// Records when a kernel was queued for execution. Kernel launches are
// identified by a unique count value from a per-GPU device timing counter.
struct PendingKernel {
uint64 queued_count;
bool terminated;
PendingKernel(const PendingKernel& pk)
: queued_count(pk.queued_count), terminated(pk.terminated) {}
PendingKernel() : queued_count(0), terminated(false) {}
};
mutex mu_;
// Ring buffer of PendingKernel records.
std::vector<PendingKernel> pending_kernels_ GUARDED_BY(mu_);
// Next unused slot in pending_kernels_.
int first_available_ GUARDED_BY(mu_) = 0;
// Last completed PendingKernel such that all prior PendingKernels are
// also completed. With out-of-order completion there may be a mixture
// of completed and uncompleted entries between last_completed_ and
// first_available_, hence num_pending_ is not guaranteed equal to
// their differerence.
int last_completed_ GUARDED_BY(mu_) = -1;
int num_pending_ GUARDED_BY(mu_) = 0;
condition_variable pending_decreased_ GUARDED_BY(mu_);
};
class BaseGPUDeviceFactory : public DeviceFactory {
public:
Status CreateDevices(const SessionOptions& options, const string& name_prefix,

View File

@ -24,6 +24,7 @@ limitations under the License.
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/gtl/stl_util.h"
#include "tensorflow/core/lib/random/random.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
@ -276,6 +277,71 @@ TEST_F(GPUDeviceTest, UnifiedMemoryAllocation) {
allocator->DeallocateRaw(ptr);
}
class GPUKernelTrackerTest : public ::testing::Test {
protected:
void SetUp() {
std::unique_ptr<SharedCounter> counter(new SharedCounter);
timing_counter_ = counter.get();
kernel_tracker_.reset(
new GPUKernelTracker(Env::Default(), std::move(counter)));
}
std::unique_ptr<GPUKernelTracker> kernel_tracker_;
SharedCounter* timing_counter_ = nullptr;
};
TEST_F(GPUKernelTrackerTest, basic) {
EXPECT_EQ(0, kernel_tracker_->NumPending());
// 1 is the expected value when no kernels have yet terminated.
EXPECT_EQ(1, kernel_tracker_->LastTerminatedCount());
std::deque<int64> queued_counts;
for (int i = 0; i < 32; ++i) {
queued_counts.push_back(kernel_tracker_->RecordQueued());
}
EXPECT_EQ(32, kernel_tracker_->NumPending());
EXPECT_EQ(1, kernel_tracker_->LastTerminatedCount());
// Mature the kernels in order until empty.
while (!queued_counts.empty()) {
int64 x = queued_counts.front();
queued_counts.pop_front();
kernel_tracker_->RecordTerminated(x);
EXPECT_EQ(queued_counts.size(), kernel_tracker_->NumPending());
EXPECT_EQ(x, kernel_tracker_->LastTerminatedCount());
}
EXPECT_EQ(timing_counter_->get(), kernel_tracker_->LastTerminatedCount());
// Next inject so many kernel events that the ring buffer needs
// to grow a couple of times, while maturing a few in random order
// to introduce gaps between last_completed_ and first_available_.
int64 lower_bound = timing_counter_->get();
for (int i = 0; i < 1111; ++i) {
queued_counts.push_back(kernel_tracker_->RecordQueued());
int64 upper_bound = timing_counter_->get();
if (0 == (i % 16)) {
size_t index = (random::New64() % queued_counts.size());
kernel_tracker_->RecordTerminated(queued_counts[index]);
queued_counts.erase(queued_counts.begin() + index);
EXPECT_LE(lower_bound, kernel_tracker_->LastTerminatedCount());
EXPECT_GE(upper_bound, kernel_tracker_->LastTerminatedCount());
}
}
// Next mature the remaining kernels in order until empty.
while (!queued_counts.empty()) {
int64 x = queued_counts.front();
queued_counts.pop_front();
kernel_tracker_->RecordTerminated(x);
EXPECT_EQ(queued_counts.size(), kernel_tracker_->NumPending());
// There may be a gap here where we find a kernel that got terminated
// out of order, earlier, so the LastTerminatedCount can actually
// jump past x.
EXPECT_LE(x, kernel_tracker_->LastTerminatedCount());
}
EXPECT_EQ(timing_counter_->get(), kernel_tracker_->LastTerminatedCount());
}
} // namespace tensorflow
#endif

View File

@ -241,7 +241,9 @@ void EventMgr::QueueInUse(se::Stream* stream, InUse iu) {
// events have recorded, and then retire them. Initial observations
// suggest that typical behavior in a TensorFlow program is to have
// 0-3 events pending most of the time, but there are occasionally
// spikes of up to several hundred outstanding.
// spikes of up to several hundred outstanding. (If GPUKernelTracker
// is used to cap pending kernels there should never be more than
// that many.)
//
// NOTE: If all events are on the same stream, no later event will
// complete before an earlier event, except possibly if the earlier
@ -249,13 +251,10 @@ void EventMgr::QueueInUse(se::Stream* stream, InUse iu) {
// looking past the first kPending event. However, if we're using
// multiple streams there may be some gain in looking deeper.
// As a compromise, PollEvent() calls that are triggered by the queueing
// of a single event never look past the first kPending event. Calls
// coming from the dedicated polling thread always sweep the full queue.
//
// Note that allowing the queue to grow very long could cause overall
// GPU memory use to spike needlessly. An alternative strategy would
// be to throttle new Op execution until the pending event queue
// clears.
// of a single event never look past the first kPending event. Consequently
// those calls do an expected constant amount of work, unaffected by the
// length of the pending queue. Calls coming from the dedicated
// polling thread always sweep the full queue.
void EventMgr::PollEvents(bool is_dedicated_poller,
gtl::InlinedVector<InUse, 4>* to_free) {
VLOG(2) << "PollEvents free_events_ " << free_events_.size()

View File

@ -27,6 +27,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/gpu/gpu_id_utils.h"
#include "tensorflow/core/common_runtime/gpu/gpu_init.h"
#include "tensorflow/core/common_runtime/pool_allocator.h"
#include "tensorflow/core/common_runtime/shared_counter.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/framework/log_memory.h"
#include "tensorflow/core/framework/tracking_allocator.h"
@ -110,9 +111,15 @@ Allocator* GPUProcessState::GetGPUAllocator(const GPUOptions& options,
(options.per_process_gpu_memory_fraction() > 1.0 ||
options.experimental().use_unified_memory()),
gpu_visitors_[bus_id], {});
Allocator* gpu_allocator =
GPUBFCAllocator* gpu_bfc_allocator =
new GPUBFCAllocator(sub_allocator, total_bytes, options,
strings::StrCat("GPU_", tf_gpu_id.value(), "_bfc"));
Allocator* gpu_allocator = gpu_bfc_allocator;
SharedCounter* timing_counter = nullptr;
if (options.experimental().timestamped_allocator()) {
timing_counter = new SharedCounter;
gpu_bfc_allocator->SetTimingCounter(timing_counter);
}
// If true, checks for memory overwrites by writing
// distinctive patterns on both ends of allocated memory.
@ -137,7 +144,9 @@ Allocator* GPUProcessState::GetGPUAllocator(const GPUOptions& options,
recording_allocator = new internal::RecordingAllocator(
&process_state_->mem_desc_map_, gpu_allocator, md, &mu_);
}
allocator_parts = {std::unique_ptr<Allocator>(gpu_allocator), sub_allocator,
allocator_parts = {std::unique_ptr<Allocator>(gpu_allocator),
std::unique_ptr<SharedCounter>(timing_counter),
sub_allocator,
std::unique_ptr<Allocator>(recording_allocator)};
}
if (process_state_->ProcessState::FLAGS_brain_gpu_record_mem_types) {
@ -151,6 +160,23 @@ Allocator* GPUProcessState::GetGPUAllocator(const GPUOptions& options,
#endif // GOOGLE_CUDA
}
std::unique_ptr<SharedCounter> GPUProcessState::ReleaseGPUAllocatorCounter(
TfGpuId tf_gpu_id) {
DCHECK(process_state_);
#if GOOGLE_CUDA
GpuIdUtil::CheckValidTfGpuId(tf_gpu_id);
mutex_lock l(mu_);
if (tf_gpu_id.value() >= static_cast<int64>(gpu_allocators_.size())) {
return nullptr;
}
AllocatorParts& allocator_parts = gpu_allocators_[tf_gpu_id.value()];
return std::move(allocator_parts.counter);
#else
return nullptr;
#endif
}
Allocator* GPUProcessState::GetCUDAHostAllocator(int numa_node) {
CHECK(process_state_);
if (!HasGPUDevice() ||
@ -224,6 +250,7 @@ Allocator* GPUProcessState::GetCUDAHostAllocator(int numa_node) {
allocator = new TrackingAllocator(allocator, true);
}
cuda_host_allocators_.push_back({std::unique_ptr<Allocator>(allocator),
std::unique_ptr<SharedCounter>(nullptr),
sub_allocator,
std::unique_ptr<Allocator>(nullptr)});
AllocatorParts& allocator_parts = cuda_host_allocators_.back();

View File

@ -23,6 +23,7 @@ limitations under the License.
#include "tensorflow/core/common_runtime/gpu/gpu_id.h"
#include "tensorflow/core/common_runtime/process_state.h"
#include "tensorflow/core/common_runtime/shared_counter.h"
#include "tensorflow/core/framework/allocator.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/thread_annotations.h"
@ -33,6 +34,7 @@ namespace tensorflow {
class Allocator;
class PoolAllocator;
class SharedCounter;
// Singleton that manages per-process state when GPUs are present.
class GPUProcessState {
@ -108,6 +110,8 @@ class GPUProcessState {
// Returns bus_id for the given GPU id.
virtual int BusIdForGPU(TfGpuId tf_gpu_id);
std::unique_ptr<SharedCounter> ReleaseGPUAllocatorCounter(TfGpuId tf_gpu_id);
protected:
// GPUProcessState is a singleton that should not normally be deleted except
// at process shutdown.
@ -132,6 +136,7 @@ class GPUProcessState {
struct AllocatorParts {
std::unique_ptr<Allocator> allocator;
std::unique_ptr<SharedCounter> counter;
SubAllocator* sub_allocator; // owned by allocator
std::unique_ptr<Allocator> recording_allocator;
};

View File

@ -0,0 +1,31 @@
/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_CORE_COMMON_RUNTIME_SHARED_COUNTER_H_
#define TENSORFLOW_CORE_COMMON_RUNTIME_SHARED_COUNTER_H_
namespace tensorflow {
// A lightweight thread-safe monotone counter for establishing
// temporal ordering.
class SharedCounter {
public:
int64 get() { return value_; }
int64 next() { return ++value_; }
private:
std::atomic<int64> value_{0};
};
} // namespace tensorflow
#endif // TENSORFLOW_CORE_COMMON_RUNTIME_SHARED_COUNTER_H_

View File

@ -68,13 +68,14 @@ Status TensorResponse::InitFrom(RecvTensorResponse* response) {
return s;
}
void TensorResponse::InitPartial(const RecvTensorResponse& response) {
void TensorResponse::InitPartial(const RecvTensorResponse& response,
const AllocationAttributes& allocation_attr) {
// Everything except content is present in *response. Content will
// arrive later; allocate a Tensor with appropriate storage for that
// content.
meta_ = response;
TensorShape shape(meta_.tensor().tensor_shape());
Tensor t(allocator_, meta_.tensor().dtype(), shape);
Tensor t(allocator_, meta_.tensor().dtype(), shape, allocation_attr);
tensor_ = std::move(t);
}

View File

@ -76,7 +76,8 @@ class TensorResponse {
// Initialize tensor metadata from response and allocate
// uninitialized backing storage for actual contents.
void InitPartial(const RecvTensorResponse& response);
void InitPartial(const RecvTensorResponse& response,
const AllocationAttributes& allocation_attr);
// Return a reference to the parsed tensor. The tensor will remain
// live only until *this is destroyed or modified.

View File

@ -46,6 +46,10 @@ struct AllocationAttributes {
// which Op is performing the allocation, and sets this flag to
// true.
bool allocation_will_be_logged = false;
// EXPERIMENTAL: If provided, then evaluates to a timing count such that only
// a memory chunk whose last-freed count is at this value or earlier may be
// returned.
std::function<uint64()> freed_by_func = nullptr;
};
// Runtime statistics collected by an allocator.

View File

@ -246,6 +246,15 @@ class DeviceBase {
return errors::Internal("Device does not implement MakeTensorFromProto()");
}
// Some devices (i.e. GPUs) may free device memory prior to its actual use
// being completed on the assumption that subsequent allocations can only be
// used serially with respect to pending uses. If this function returns a
// non-zero value it is the value of a device-specific counter such that any
// device memory tagged with an earlier freed-at count is really unencumbered
// by pending uses. For this to be useful the device memory allocator must
// be tagging deallocated memory chunks using the same counter.
virtual uint64 SafeAllocFrontier() { return 0; }
protected:
// Does not take ownership.
void set_tensorflow_device_thread_pool(thread::ThreadPool* thread_pool) {

View File

@ -156,6 +156,16 @@ message GPUOptions {
// CollectiveReduce, and serves as an override to automatic ring order
// generation in OrderTaskDeviceMap() during CollectiveParam resolution.
string collective_ring_order = 4;
// If true then extra work is done by GPUDevice and GPUBFCAllocator to
// keep track of when GPU memory is freed and when kernels actually
// complete so that we can know when a nominally free memory chunk
// is really not subject to pending use.
bool timestamped_allocator = 5;
// If > 0 limit the number of pending kernels on any compute
// stream to this number.
int32 pending_cap = 6;
}
// Everything inside experimental is subject to change and is not subject

View File

@ -84,6 +84,18 @@ tf_proto {
label: LABEL_OPTIONAL
type: TYPE_STRING
}
field {
name: "timestamped_allocator"
number: 5
label: LABEL_OPTIONAL
type: TYPE_BOOL
}
field {
name: "pending_cap"
number: 6
label: LABEL_OPTIONAL
type: TYPE_INT32
}
nested_type {
name: "VirtualDevices"
field {