Automated rollback of commit 20c979ea235dfb6a923dde6b9c8f4c7e9f310efa
PiperOrigin-RevId: 246892106
This commit is contained in:
parent
2707117ab8
commit
f88823d923
@ -374,28 +374,13 @@ struct UnsortedSegmentFunctor<CPUDevice, T, Index, InitialValueF, ReductionF> {
|
||||
typename TTypes<Index>::ConstFlat segment_ids,
|
||||
const Index data_size, const T* data,
|
||||
typename TTypes<T, 2>::Tensor output) {
|
||||
auto cpu_device = ctx->eigen_cpu_device();
|
||||
output.device(cpu_device) = output.constant(InitialValueF()());
|
||||
output.setConstant(InitialValueF()());
|
||||
if (data_size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// This functor will reduce N rows input to M rows output
|
||||
// N: segment_ids.dimension(0)
|
||||
// M: num_segments
|
||||
const int64 N = segment_ids.dimension(0);
|
||||
const int64 inner_dim = data_size / N;
|
||||
ReductionF reduction;
|
||||
auto data_flat = typename TTypes<T, 2>::ConstTensor(data, N, inner_dim);
|
||||
const int64 num_threads = cpu_device.numThreads();
|
||||
|
||||
// 'num_reductions' counts the number of output rows actually reduced,
|
||||
// the rows only filled with InitialValueF() will be excluded.
|
||||
// It also determines the degree of maximum parallelism.
|
||||
int64 num_reductions = 0;
|
||||
// 'row_counter' records how many input rows will be reduced in each
|
||||
// output row, the row only fills with InitialValueF() will keep 0.
|
||||
std::vector<Index> row_counter(num_segments, 0);
|
||||
auto data_flat = typename TTypes<T, 2>::ConstTensor(data, N, data_size / N);
|
||||
for (int64 i = 0; i < N; ++i) {
|
||||
Index j = internal::SubtleMustCopy(segment_ids(i));
|
||||
if (j < 0) {
|
||||
@ -405,76 +390,8 @@ struct UnsortedSegmentFunctor<CPUDevice, T, Index, InitialValueF, ReductionF> {
|
||||
errors::InvalidArgument(
|
||||
"segment_ids", SliceDebugString(segment_ids_shape, i),
|
||||
" = ", j, " is out of range [0, ", num_segments, ")"));
|
||||
if (row_counter[j] == 0) {
|
||||
num_reductions++;
|
||||
}
|
||||
row_counter[j]++;
|
||||
reduction(data_flat.template chip<0>(i), output.template chip<0>(j));
|
||||
}
|
||||
DCHECK_LE(num_reductions, num_segments);
|
||||
|
||||
// Nothing to reduce. All output values equal to `InitialValueF()`.
|
||||
if (num_reductions == 0) return;
|
||||
|
||||
// Estimate task size and block number for shard function according to the
|
||||
// rules:
|
||||
// 1. each task size is 64 at least, to ensure each thread gets enough
|
||||
// work to do.
|
||||
// 2. total task block number shouldn't be greater than num_reductions or
|
||||
// num_threads.
|
||||
const int64 min_block_size = 64;
|
||||
int64 block_num = std::min(num_reductions, num_threads);
|
||||
int64 block_size = (N - 1) / block_num + 1;
|
||||
if (block_size < min_block_size) {
|
||||
block_size = min_block_size;
|
||||
block_num = (N - 1) / min_block_size + 1;
|
||||
}
|
||||
|
||||
// Compute the real task size for each block and record the index.
|
||||
// Keep 'block_range[0]' 0 because need a start index for shard function.
|
||||
int64 next_block_idx = 1;
|
||||
std::vector<Index> block_range(block_num + 1, 0);
|
||||
for (int64 i = 0, cur_count = 0; i < num_segments; i++) {
|
||||
cur_count += row_counter[i];
|
||||
// Add task in current block, til it's greater than estimated size.
|
||||
if (cur_count < block_size) {
|
||||
continue;
|
||||
} else {
|
||||
block_range[next_block_idx] = i + 1;
|
||||
// Exit when all tasks have been filled, otherwise increase block
|
||||
// index and fill in new task.
|
||||
if (block_range[next_block_idx] == num_segments) {
|
||||
break;
|
||||
} else {
|
||||
next_block_idx++;
|
||||
cur_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Reset 'block_num' to real number. The last block ends at num_segments.
|
||||
block_range[next_block_idx] = num_segments;
|
||||
block_num = next_block_idx;
|
||||
|
||||
auto reductionWorker = [&](int64 begin, int64 end) -> void {
|
||||
// traversal all inputs.
|
||||
for (int64 i = 0; i < N; i++) {
|
||||
// Get the corresponding output index j of input i.
|
||||
Index j = internal::SubtleMustCopy(segment_ids(i));
|
||||
// If j is in work scope of this worker, do the reduction.
|
||||
if (j >= block_range[begin] && j < block_range[end]) {
|
||||
reduction(data_flat.template chip<0>(i), output.template chip<0>(j));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// reduction functors includes Sum, Max, Min, etc. Simply consider it
|
||||
// will cost 5 cycles per operation.
|
||||
const int64 compute_cycles = 5 * (N - num_reductions) * inner_dim;
|
||||
const int64 output_bytes = num_reductions * inner_dim * sizeof(T);
|
||||
const Eigen::TensorOpCost cost(data_size * sizeof(T), output_bytes,
|
||||
compute_cycles);
|
||||
|
||||
// Submit jobs to intra thread pool
|
||||
cpu_device.parallelFor(block_num, cost, reductionWorker);
|
||||
}
|
||||
};
|
||||
|
||||
@ -676,6 +593,7 @@ REGISTER_COMPLEX_CPU_UNSORTED_KERNELS_ALL(complex128);
|
||||
REGISTER_SUM_GPU_UNSORTED_KERNELS(type, int32); \
|
||||
REGISTER_SUM_GPU_UNSORTED_KERNELS(type, int64);
|
||||
|
||||
|
||||
TF_CALL_GPU_NUMBER_TYPES(REGISTER_REAL_GPU_UNSORTED_KERNELS_ALL);
|
||||
TF_CALL_int32(REGISTER_REAL_GPU_UNSORTED_KERNELS_ALL);
|
||||
TF_CALL_GPU_NUMBER_TYPES(REGISTER_SUM_GPU_UNSORTED_KERNELS_ALL);
|
||||
|
@ -166,50 +166,4 @@ static void BM_SparseSegmentMeanGrad_High(int iters, int size) {
|
||||
BENCHMARK(BM_SparseSegmentMeanGrad_Low)->Arg(1000)->Arg(100000);
|
||||
BENCHMARK(BM_SparseSegmentMeanGrad_High)->Arg(1000)->Arg(100000);
|
||||
|
||||
static void BM_UnsortedSegmentReduction(int iters, const string& reduction,
|
||||
int num_rows, int num_cols,
|
||||
int segment_size) {
|
||||
testing::StopTiming();
|
||||
Graph* g = new Graph(OpRegistry::Global());
|
||||
|
||||
// Create inputs
|
||||
TensorShape shape1({num_rows, num_cols});
|
||||
Tensor input(DT_FLOAT, shape1);
|
||||
input.flat<float>().setRandom();
|
||||
|
||||
TensorShape shape2({num_rows});
|
||||
Tensor indices(DT_INT32, shape2);
|
||||
test::FillFn<int>(&indices,
|
||||
[&segment_size](int i) -> int { return i % segment_size; });
|
||||
|
||||
Tensor num_segments(DT_INT32, TensorShape({}));
|
||||
num_segments.scalar<int>()() = segment_size;
|
||||
|
||||
Node* node;
|
||||
TF_CHECK_OK(NodeBuilder(g->NewName("n"), reduction)
|
||||
.Input(test::graph::Constant(g, input))
|
||||
.Input(test::graph::Constant(g, indices))
|
||||
.Input(test::graph::Constant(g, num_segments))
|
||||
.Attr("T", DT_FLOAT)
|
||||
.Finalize(g, &node));
|
||||
|
||||
testing::UseRealTime();
|
||||
testing::BytesProcessed(static_cast<int64>(iters) * (num_rows * num_cols) *
|
||||
sizeof(float));
|
||||
testing::StartTiming();
|
||||
test::Benchmark("cpu", g).Run(iters);
|
||||
}
|
||||
|
||||
#define BM_UnsortedReduce(O, R, C, S) \
|
||||
static void BM_##O##_##R##_##C##_##S##_(int iters) { \
|
||||
BM_UnsortedSegmentReduction(iters, #O, R, C, S); \
|
||||
} \
|
||||
BENCHMARK(BM_##O##_##R##_##C##_##S##_);
|
||||
|
||||
#define BM_UnsortedReduce_Arg(R, C, S) \
|
||||
BM_UnsortedReduce(UnsortedSegmentSum, R, C, S);
|
||||
|
||||
BM_UnsortedReduce_Arg(4096, 128, 1);
|
||||
BM_UnsortedReduce_Arg(4096, 128, 128);
|
||||
|
||||
} // namespace tensorflow
|
||||
|
Loading…
x
Reference in New Issue
Block a user