Quantiles TFT Analyzer and Mapper.
Added a Bucketize op suitable for working with TFT. PiperOrigin-RevId: 163994906
This commit is contained in:
parent
5951ab51a9
commit
6b3cb17b00
@ -874,4 +874,57 @@ class QuantilesOp : public OpKernel {
|
||||
|
||||
REGISTER_KERNEL_BUILDER(Name("Quantiles").Device(DEVICE_CPU), QuantilesOp);
|
||||
|
||||
template <typename T>
|
||||
class BucketizeWithInputBoundariesOp : public OpKernel {
|
||||
public:
|
||||
explicit BucketizeWithInputBoundariesOp(OpKernelConstruction* context)
|
||||
: OpKernel(context) {}
|
||||
|
||||
void Compute(OpKernelContext* context) override {
|
||||
const Tensor& boundaries_tensor = context->input(1);
|
||||
VLOG(1) << "boundaries has shape: "
|
||||
<< boundaries_tensor.shape().DebugString();
|
||||
auto boundaries = boundaries_tensor.flat<float>();
|
||||
boundaries_.clear();
|
||||
for (size_t i = 0; i < boundaries.size(); i++) {
|
||||
boundaries_.push_back(boundaries(i));
|
||||
VLOG(1) << "boundaries(" << i << ") : " << boundaries(i);
|
||||
}
|
||||
OP_REQUIRES(context, std::is_sorted(boundaries_.begin(), boundaries_.end()),
|
||||
errors::InvalidArgument("Expected sorted boundaries"));
|
||||
|
||||
const Tensor& input_tensor = context->input(0);
|
||||
auto input = input_tensor.flat<T>();
|
||||
|
||||
Tensor* output_tensor = nullptr;
|
||||
OP_REQUIRES_OK(context, context->allocate_output(0, input_tensor.shape(),
|
||||
&output_tensor));
|
||||
auto output = output_tensor->template flat<int32>();
|
||||
|
||||
for (size_t i = 0; i < input.size(); i++) {
|
||||
output(i) = CalculateBucketIndex(input(i));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
int32 CalculateBucketIndex(const T value) {
|
||||
auto first_bigger_it =
|
||||
std::upper_bound(boundaries_.begin(), boundaries_.end(), value);
|
||||
return first_bigger_it - boundaries_.begin();
|
||||
}
|
||||
std::vector<T> boundaries_;
|
||||
};
|
||||
|
||||
#define REGISTER_KERNEL(T) \
|
||||
REGISTER_KERNEL_BUILDER(Name("BucketizeWithInputBoundaries") \
|
||||
.Device(DEVICE_CPU) \
|
||||
.TypeConstraint<T>("T"), \
|
||||
BucketizeWithInputBoundariesOp<T>);
|
||||
|
||||
REGISTER_KERNEL(int32);
|
||||
REGISTER_KERNEL(int64);
|
||||
REGISTER_KERNEL(float);
|
||||
REGISTER_KERNEL(double);
|
||||
#undef REGISTER_KERNEL
|
||||
|
||||
} // namespace tensorflow
|
||||
|
@ -286,5 +286,33 @@ sparse_quantiles: Rank 1 tensors representing associated quantiles for each of
|
||||
the sparse feature tensors.
|
||||
)doc");
|
||||
|
||||
REGISTER_OP("BucketizeWithInputBoundaries")
|
||||
.Input("input: T")
|
||||
.Input("boundaries: float")
|
||||
.Output("output: int32")
|
||||
.Attr("T: {int32, int64, float, double}")
|
||||
.SetShapeFn(shape_inference::UnchangedShape)
|
||||
.Doc(R"doc(
|
||||
Bucketizes 'input' based on 'boundaries'. This function is similar to Bucketize
|
||||
op in core math_ops, except that boundaries are specified using an input tensor,
|
||||
as compared with a fixed attribute in Bucketize().
|
||||
|
||||
For example, if the inputs are
|
||||
boundaries = [0, 10, 100]
|
||||
input = [[-5, 10000]
|
||||
[150, 10]
|
||||
[5, 100]]
|
||||
|
||||
then the output will be
|
||||
output = [[0, 3]
|
||||
[3, 2]
|
||||
[1, 3]]
|
||||
|
||||
input: Any shape of Tensor contains with numeric type.
|
||||
boundaries: A vector Tensor of sorted floats specifies the boundaries
|
||||
of the buckets.
|
||||
output: Same shape as 'input', where each value of input is replaced with its corresponding bucket index.
|
||||
)doc");
|
||||
|
||||
} // namespace gtflow
|
||||
} // namespace tensorflow
|
||||
|
@ -81,6 +81,48 @@ class QuantileBucketsOpTest(test_util.TensorFlowTestCase):
|
||||
self.assertAllEqual([1, 3, 5], dense_buckets[0].eval())
|
||||
self.assertAllEqual([2, 4, 6.], sparse_buckets[0].eval())
|
||||
|
||||
def testStreamingQuantileBucketsWithVaryingBatch(self):
|
||||
"""Sets up the quantile summary op test as follows.
|
||||
|
||||
Creates batches examples with different number of inputs in each batch.
|
||||
The input values are dense in the range [1 ... N]
|
||||
The data looks like this:
|
||||
| Batch | Start | InputList
|
||||
| 1 | 1 | [1]
|
||||
| 2 | 2 | [2, 3]
|
||||
| 3 | 4 | [4, 5, 6]
|
||||
| 4 | 7 | [7, 8, 9, 10]
|
||||
| 5 | 11 | [11, 12, 13, 14, 15]
|
||||
| 6 | 16 | [16, 17, 18, 19, 20, 21]
|
||||
"""
|
||||
|
||||
with self.test_session() as sess:
|
||||
accumulator = quantile_ops.QuantileAccumulator(
|
||||
init_stamp_token=0, num_quantiles=3, epsilon=0.001, name="q1")
|
||||
resources.initialize_resources(resources.shared_resources()).run()
|
||||
input_column = array_ops.placeholder(dtypes.float32)
|
||||
weights = array_ops.placeholder(dtypes.float32)
|
||||
update = accumulator.add_summary(
|
||||
stamp_token=0,
|
||||
column=input_column,
|
||||
example_weights=weights)
|
||||
|
||||
with self.test_session() as sess:
|
||||
for i in range(1, 23):
|
||||
# start = 1, 2, 4, 7, 11, 16 ... (see comment above)
|
||||
start = int((i * (i-1) / 2) + 1)
|
||||
sess.run(update,
|
||||
{input_column: range(start, start+i),
|
||||
weights: [1] * i})
|
||||
|
||||
with self.test_session() as sess:
|
||||
sess.run(accumulator.flush(stamp_token=0, next_stamp_token=1))
|
||||
are_ready_flush, buckets = (accumulator.get_buckets(stamp_token=1))
|
||||
buckets, are_ready_flush = (sess.run(
|
||||
[buckets, are_ready_flush]))
|
||||
self.assertEqual(True, are_ready_flush)
|
||||
self.assertAllEqual([1, 86., 170., 253.], buckets)
|
||||
|
||||
def testStreamingQuantileBuckets(self):
|
||||
"""Sets up the quantile summary op test as follows.
|
||||
|
||||
@ -393,6 +435,29 @@ class QuantilesOpTest(test_util.TensorFlowTestCase):
|
||||
# Sparse feature 2
|
||||
self.assertAllEqual([0, 0], sparse_quantiles[2].eval())
|
||||
|
||||
def testBucketizeWithInputBoundaries(self):
|
||||
with self.test_session():
|
||||
buckets = quantile_ops.bucketize_with_input_boundaries(
|
||||
input=[1, 2, 3, 4, 5],
|
||||
boundaries=[3])
|
||||
self.assertAllEqual([0, 0, 1, 1, 1], buckets.eval())
|
||||
|
||||
def testBucketizeWithInputBoundaries2(self):
|
||||
with self.test_session():
|
||||
boundaries = constant_op.constant([3], dtype=dtypes.float32)
|
||||
buckets = quantile_ops.bucketize_with_input_boundaries(
|
||||
input=[1, 2, 3, 4, 5],
|
||||
boundaries=boundaries)
|
||||
self.assertAllEqual([0, 0, 1, 1, 1], buckets.eval())
|
||||
|
||||
def testBucketizeWithInputBoundaries3(self):
|
||||
with self.test_session():
|
||||
b = array_ops.placeholder(dtypes.float32)
|
||||
buckets = quantile_ops.bucketize_with_input_boundaries(
|
||||
input=[1, 2, 3, 4, 5],
|
||||
boundaries=b)
|
||||
self.assertAllEqual([0, 1, 1, 2, 2],
|
||||
buckets.eval(feed_dict={b: [2, 4]}))
|
||||
|
||||
if __name__ == "__main__":
|
||||
googletest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user