diff --git a/tensorflow/core/data/service/BUILD b/tensorflow/core/data/service/BUILD index 6003362406f..68c0f2d47d7 100644 --- a/tensorflow/core/data/service/BUILD +++ b/tensorflow/core/data/service/BUILD @@ -4,6 +4,10 @@ load( "tf_additional_all_protos", "tf_proto_library", ) +load( + "//tensorflow:tensorflow.bzl", + "tf_cc_test", +) package( default_visibility = [ @@ -41,6 +45,39 @@ tf_proto_library( ], ) +cc_library( + name = "compression_utils", + srcs = ["compression_utils.cc"], + hdrs = [ + "compression_utils.h", + ], + deps = [ + ":common_proto_cc", + "//tensorflow/core:core_cpu", + "//tensorflow/core:core_cpu_internal", + "//tensorflow/core:framework_internal", + "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", + "//tensorflow/core:protos_all_cc", + "//tensorflow/core/profiler/lib:traceme", + "@com_google_absl//absl/memory", + ], +) + +tf_cc_test( + name = "compression_utils_test", + srcs = ["compression_utils_test.cc"], + deps = [ + ":compression_utils", + "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", + "//tensorflow/core:test", + "//tensorflow/core:test_main", + "//tensorflow/core:testlib", + "//tensorflow/core/kernels/data:dataset_test_base", + ], +) + cc_grpc_library( name = "master_cc_grpc_proto", srcs = [":master_proto"], diff --git a/tensorflow/core/data/service/compression_utils.cc b/tensorflow/core/data/service/compression_utils.cc new file mode 100644 index 00000000000..c4a47e1b00e --- /dev/null +++ b/tensorflow/core/data/service/compression_utils.cc @@ -0,0 +1,151 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#include "tensorflow/core/data/service/compression_utils.h" + +#include "tensorflow/core/common_runtime/dma_helper.h" +#include "tensorflow/core/framework/tensor.pb.h" +#include "tensorflow/core/platform/snappy.h" +#include "tensorflow/core/profiler/lib/traceme.h" + +namespace tensorflow { +namespace data { +namespace service_util { + +Status Compress(const std::vector& element, CompressedElement* out) { + tensorflow::profiler::TraceMe activity( + "Compress", tensorflow::profiler::TraceMeLevel::kInfo); + + // Step 1: Determine the total uncompressed size. This requires serializing + // non-memcopyable tensors, which we save to use again later. + std::vector non_memcpy_components; + int64 total_size = 0; + for (auto& component : element) { + if (DataTypeCanUseMemcpy(component.dtype())) { + // Some datatypes can be memcopied, allowing us to save two copies + // (AsProtoTensorContent and SerializeToArray). + total_size += DMAHelper::buffer(&component)->size(); + } else { + non_memcpy_components.emplace_back(); + component.AsProtoTensorContent(&non_memcpy_components.back()); + total_size += non_memcpy_components.back().ByteSizeLong(); + } + } + + // Step 2: Write the tensor data to a buffer, and compress that buffer. + // We use tstring for access to resize_uninitialized. + tstring uncompressed; + uncompressed.resize_uninitialized(total_size); + // Position in `uncompressed` to write the next component. + char* position = uncompressed.mdata(); + int non_memcpy_component_index = 0; + for (auto& component : element) { + ComponentMetadata* metadata = out->mutable_component_metadata()->Add(); + metadata->set_dtype(component.dtype()); + component.shape().AsProto(metadata->mutable_tensor_shape()); + if (DataTypeCanUseMemcpy(component.dtype())) { + const TensorBuffer* buffer = DMAHelper::buffer(&component); + memcpy(position, buffer->data(), buffer->size()); + metadata->set_tensor_size_bytes(buffer->size()); + } else { + TensorProto& proto = non_memcpy_components[non_memcpy_component_index++]; + proto.SerializeToArray(position, proto.ByteSizeLong()); + metadata->set_tensor_size_bytes(proto.ByteSizeLong()); + } + position += metadata->tensor_size_bytes(); + } + DCHECK_EQ(position, uncompressed.mdata() + total_size); + + if (!port::Snappy_Compress(uncompressed.mdata(), total_size, + out->mutable_data())) { + return errors::Internal("Failed to compress using snappy."); + } + return Status::OK(); +} + +Status Uncompress(const CompressedElement& compressed, + std::vector* out) { + tensorflow::profiler::TraceMe activity( + "Uncompress", tensorflow::profiler::TraceMeLevel::kInfo); + int num_components = compressed.component_metadata_size(); + out->clear(); + out->reserve(num_components); + + // Step 1: Prepare the memory that we will uncompress into. + std::vector iov(num_components); + // We use tstring for access to resize_uninitialized. + std::vector tensor_proto_strs; + // num_components is a conservative estimate. It is important to reserve + // vector space so that the vector doesn't resize itself, which could + // invalidate pointers to its strings' data. + tensor_proto_strs.reserve(num_components); + int64 total_size = 0; + for (int i = 0; i < num_components; ++i) { + const ComponentMetadata& metadata = compressed.component_metadata(i); + if (DataTypeCanUseMemcpy(metadata.dtype())) { + out->emplace_back(metadata.dtype(), metadata.tensor_shape()); + TensorBuffer* buffer = DMAHelper::buffer(&out->back()); + iov[i].iov_base = buffer->data(); + iov[i].iov_len = buffer->size(); + } else { + // Allocate an empty Tensor. We will fill it out later after + // uncompressing into the tensor_proto_str. + out->emplace_back(); + tensor_proto_strs.emplace_back(); + tstring& tensor_proto_str = tensor_proto_strs.back(); + tensor_proto_str.resize_uninitialized(metadata.tensor_size_bytes()); + iov[i].iov_base = tensor_proto_str.mdata(); + iov[i].iov_len = tensor_proto_str.size(); + } + total_size += iov[i].iov_len; + } + + // Step 2: Uncompress into the iovec. + const std::string& compressed_data = compressed.data(); + size_t uncompressed_size; + if (!port::Snappy_GetUncompressedLength( + compressed_data.data(), compressed_data.size(), &uncompressed_size)) { + return errors::Internal("Could not get snappy uncompressed length"); + } + if (uncompressed_size != total_size) { + return errors::Internal( + "Uncompressed size mismatch. Snappy expects ", uncompressed_size, + " whereas the tensor metadata suggests ", total_size); + } + if (!port::Snappy_UncompressToIOVec(compressed_data.data(), + compressed_data.size(), iov.data(), + num_components)) { + return errors::Internal("Failed to perform snappy decompression."); + } + + // Step 3: Deserialize tensor proto strings to tensors. + int tensor_proto_strs_index = 0; + for (int i = 0; i < num_components; ++i) { + if (DataTypeCanUseMemcpy(compressed.component_metadata(i).dtype())) { + continue; + } + TensorProto tp; + if (!tp.ParseFromString(tensor_proto_strs[tensor_proto_strs_index++])) { + return errors::Internal("Could not parse TensorProto"); + } + if (!out->at(i).FromProto(tp)) { + return errors::Internal("Could not parse Tensor"); + } + } + return Status::OK(); +} + +} // namespace service_util +} // namespace data +} // namespace tensorflow diff --git a/tensorflow/core/data/service/compression_utils.h b/tensorflow/core/data/service/compression_utils.h new file mode 100644 index 00000000000..96698aaaf09 --- /dev/null +++ b/tensorflow/core/data/service/compression_utils.h @@ -0,0 +1,40 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#ifndef TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_ +#define TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_ + +#include "tensorflow/core/common_runtime/dma_helper.h" +#include "tensorflow/core/data/service/common.pb.h" +#include "tensorflow/core/platform/status.h" + +namespace tensorflow { +namespace data { +namespace service_util { + +// Compresses the components of `element` into the `CompressedElement` proto. +// +// In addition to writing the actual compressed bytes, `Compress` fills +// out the per-component metadata for the `CompressedElement`. +Status Compress(const std::vector& element, CompressedElement* out); + +// Uncompresses a `CompressedElement` into a vector of tensor components. +Status Uncompress(const CompressedElement& compressed, + std::vector* out); + +} // namespace service_util +} // namespace data +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_DATA_SERVICE_COMPRESSION_UTILS_H_ diff --git a/tensorflow/core/data/service/compression_utils_test.cc b/tensorflow/core/data/service/compression_utils_test.cc new file mode 100644 index 00000000000..b5da13efeed --- /dev/null +++ b/tensorflow/core/data/service/compression_utils_test.cc @@ -0,0 +1,55 @@ +/* Copyright 2020 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ +#include "tensorflow/core/data/service/compression_utils.h" + +#include "tensorflow/core/framework/tensor_testutil.h" +#include "tensorflow/core/kernels/data/dataset_test_base.h" +#include "tensorflow/core/platform/test.h" + +namespace tensorflow { +namespace data { +namespace service_util { + +class ParameterizedCompressionUtilsTest + : public DatasetOpsTestBase, + public ::testing::WithParamInterface> {}; + +TEST_P(ParameterizedCompressionUtilsTest, RoundTrip) { + std::vector element = GetParam(); + CompressedElement compressed; + TF_ASSERT_OK(Compress(element, &compressed)); + std::vector round_trip_element; + TF_ASSERT_OK(Uncompress(compressed, &round_trip_element)); + TF_EXPECT_OK( + ExpectEqual(element, round_trip_element, /*compare_order=*/true)); +} + +std::vector> TestCases() { + return { + CreateTensors(TensorShape{1}, {{1}}), // int64 + CreateTensors(TensorShape{1}, {{1}, {2}}), // multiple int64 + CreateTensors(TensorShape{1}, {{"a"}, {"b"}}), // tstring + {CreateTensor(TensorShape{1}, {"a"}), + CreateTensor(TensorShape{1}, {1})}, // mixed tstring/int64 + {}, // empty + }; +} + +INSTANTIATE_TEST_SUITE_P(Instantiation, ParameterizedCompressionUtilsTest, + ::testing::ValuesIn(TestCases())); + +} // namespace service_util +} // namespace data +} // namespace tensorflow