[tf.data service] Add compression utils for dataset elements.

PiperOrigin-RevId: 301721552
Change-Id: I359ffe13df37e53e8cc217dbc05fa696ddbc1f35
This commit is contained in:
Andrew Audibert 2020-03-18 19:41:09 -07:00 committed by TensorFlower Gardener
parent 7fe72602b4
commit 60fcd57c93
4 changed files with 283 additions and 0 deletions

View File

@ -4,6 +4,10 @@ load(
"tf_additional_all_protos",
"tf_proto_library",
)
load(
"//tensorflow:tensorflow.bzl",
"tf_cc_test",
)
package(
default_visibility = [
@ -41,6 +45,39 @@ tf_proto_library(
],
)
cc_library(
name = "compression_utils",
srcs = ["compression_utils.cc"],
hdrs = [
"compression_utils.h",
],
deps = [
":common_proto_cc",
"//tensorflow/core:core_cpu",
"//tensorflow/core:core_cpu_internal",
"//tensorflow/core:framework_internal",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:protos_all_cc",
"//tensorflow/core/profiler/lib:traceme",
"@com_google_absl//absl/memory",
],
)
tf_cc_test(
name = "compression_utils_test",
srcs = ["compression_utils_test.cc"],
deps = [
":compression_utils",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
"//tensorflow/core:testlib",
"//tensorflow/core/kernels/data:dataset_test_base",
],
)
cc_grpc_library(
name = "master_cc_grpc_proto",
srcs = [":master_proto"],

View File

@ -0,0 +1,151 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/data/service/compression_utils.h"
#include "tensorflow/core/common_runtime/dma_helper.h"
#include "tensorflow/core/framework/tensor.pb.h"
#include "tensorflow/core/platform/snappy.h"
#include "tensorflow/core/profiler/lib/traceme.h"
namespace tensorflow {
namespace data {
namespace service_util {
Status Compress(const std::vector<Tensor>& element, CompressedElement* out) {
tensorflow::profiler::TraceMe activity(
"Compress", tensorflow::profiler::TraceMeLevel::kInfo);
// Step 1: Determine the total uncompressed size. This requires serializing
// non-memcopyable tensors, which we save to use again later.
std::vector<TensorProto> non_memcpy_components;
int64 total_size = 0;
for (auto& component : element) {
if (DataTypeCanUseMemcpy(component.dtype())) {
// Some datatypes can be memcopied, allowing us to save two copies
// (AsProtoTensorContent and SerializeToArray).
total_size += DMAHelper::buffer(&component)->size();
} else {
non_memcpy_components.emplace_back();
component.AsProtoTensorContent(&non_memcpy_components.back());
total_size += non_memcpy_components.back().ByteSizeLong();
}
}
// Step 2: Write the tensor data to a buffer, and compress that buffer.
// We use tstring for access to resize_uninitialized.
tstring uncompressed;
uncompressed.resize_uninitialized(total_size);
// Position in `uncompressed` to write the next component.
char* position = uncompressed.mdata();
int non_memcpy_component_index = 0;
for (auto& component : element) {
ComponentMetadata* metadata = out->mutable_component_metadata()->Add();
metadata->set_dtype(component.dtype());
component.shape().AsProto(metadata->mutable_tensor_shape());
if (DataTypeCanUseMemcpy(component.dtype())) {
const TensorBuffer* buffer = DMAHelper::buffer(&component);
memcpy(position, buffer->data(), buffer->size());
metadata->set_tensor_size_bytes(buffer->size());
} else {
TensorProto& proto = non_memcpy_components[non_memcpy_component_index++];
proto.SerializeToArray(position, proto.ByteSizeLong());
metadata->set_tensor_size_bytes(proto.ByteSizeLong());
}
position += metadata->tensor_size_bytes();
}
DCHECK_EQ(position, uncompressed.mdata() + total_size);
if (!port::Snappy_Compress(uncompressed.mdata(), total_size,
out->mutable_data())) {
return errors::Internal("Failed to compress using snappy.");
}
return Status::OK();
}
Status Uncompress(const CompressedElement& compressed,
std::vector<Tensor>* out) {
tensorflow::profiler::TraceMe activity(
"Uncompress", tensorflow::profiler::TraceMeLevel::kInfo);
int num_components = compressed.component_metadata_size();
out->clear();
out->reserve(num_components);
// Step 1: Prepare the memory that we will uncompress into.
std::vector<struct iovec> iov(num_components);
// We use tstring for access to resize_uninitialized.
std::vector<tstring> tensor_proto_strs;
// num_components is a conservative estimate. It is important to reserve
// vector space so that the vector doesn't resize itself, which could
// invalidate pointers to its strings' data.
tensor_proto_strs.reserve(num_components);
int64 total_size = 0;
for (int i = 0; i < num_components; ++i) {
const ComponentMetadata& metadata = compressed.component_metadata(i);
if (DataTypeCanUseMemcpy(metadata.dtype())) {
out->emplace_back(metadata.dtype(), metadata.tensor_shape());
TensorBuffer* buffer = DMAHelper::buffer(&out->back());
iov[i].iov_base = buffer->data();
iov[i].iov_len = buffer->size();
} else {
// Allocate an empty Tensor. We will fill it out later after
// uncompressing into the tensor_proto_str.
out->emplace_back();
tensor_proto_strs.emplace_back();
tstring& tensor_proto_str = tensor_proto_strs.back();
tensor_proto_str.resize_uninitialized(metadata.tensor_size_bytes());
iov[i].iov_base = tensor_proto_str.mdata();
iov[i].iov_len = tensor_proto_str.size();
}
total_size += iov[i].iov_len;
}
// Step 2: Uncompress into the iovec.
const std::string& compressed_data = compressed.data();
size_t uncompressed_size;
if (!port::Snappy_GetUncompressedLength(
compressed_data.data(), compressed_data.size(), &uncompressed_size)) {
return errors::Internal("Could not get snappy uncompressed length");
}
if (uncompressed_size != total_size) {
return errors::Internal(
"Uncompressed size mismatch. Snappy expects ", uncompressed_size,
" whereas the tensor metadata suggests ", total_size);
}
if (!port::Snappy_UncompressToIOVec(compressed_data.data(),
compressed_data.size(), iov.data(),
num_components)) {
return errors::Internal("Failed to perform snappy decompression.");
}
// Step 3: Deserialize tensor proto strings to tensors.
int tensor_proto_strs_index = 0;
for (int i = 0; i < num_components; ++i) {
if (DataTypeCanUseMemcpy(compressed.component_metadata(i).dtype())) {
continue;
}
TensorProto tp;
if (!tp.ParseFromString(tensor_proto_strs[tensor_proto_strs_index++])) {
return errors::Internal("Could not parse TensorProto");
}
if (!out->at(i).FromProto(tp)) {
return errors::Internal("Could not parse Tensor");
}
}
return Status::OK();
}
} // namespace service_util
} // namespace data
} // namespace tensorflow

View File

@ -0,0 +1,40 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_
#define TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_
#include "tensorflow/core/common_runtime/dma_helper.h"
#include "tensorflow/core/data/service/common.pb.h"
#include "tensorflow/core/platform/status.h"
namespace tensorflow {
namespace data {
namespace service_util {
// Compresses the components of `element` into the `CompressedElement` proto.
//
// In addition to writing the actual compressed bytes, `Compress` fills
// out the per-component metadata for the `CompressedElement`.
Status Compress(const std::vector<Tensor>& element, CompressedElement* out);
// Uncompresses a `CompressedElement` into a vector of tensor components.
Status Uncompress(const CompressedElement& compressed,
std::vector<Tensor>* out);
} // namespace service_util
} // namespace data
} // namespace tensorflow
#endif // TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_

View File

@ -0,0 +1,55 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/data/service/compression_utils.h"
#include "tensorflow/core/framework/tensor_testutil.h"
#include "tensorflow/core/kernels/data/dataset_test_base.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
namespace data {
namespace service_util {
class ParameterizedCompressionUtilsTest
: public DatasetOpsTestBase,
public ::testing::WithParamInterface<std::vector<Tensor>> {};
TEST_P(ParameterizedCompressionUtilsTest, RoundTrip) {
std::vector<Tensor> element = GetParam();
CompressedElement compressed;
TF_ASSERT_OK(Compress(element, &compressed));
std::vector<Tensor> round_trip_element;
TF_ASSERT_OK(Uncompress(compressed, &round_trip_element));
TF_EXPECT_OK(
ExpectEqual(element, round_trip_element, /*compare_order=*/true));
}
std::vector<std::vector<Tensor>> TestCases() {
return {
CreateTensors<int64>(TensorShape{1}, {{1}}), // int64
CreateTensors<int64>(TensorShape{1}, {{1}, {2}}), // multiple int64
CreateTensors<tstring>(TensorShape{1}, {{"a"}, {"b"}}), // tstring
{CreateTensor<tstring>(TensorShape{1}, {"a"}),
CreateTensor<int64>(TensorShape{1}, {1})}, // mixed tstring/int64
{}, // empty
};
}
INSTANTIATE_TEST_SUITE_P(Instantiation, ParameterizedCompressionUtilsTest,
::testing::ValuesIn(TestCases()));
} // namespace service_util
} // namespace data
} // namespace tensorflow