Changing ZlibInputBuffer to ZlibInputStream and making it so that it implements InputStreamInterface.

Change: 132153154
This commit is contained in:
Rohan Jain 2016-09-03 09:51:19 -08:00 committed by TensorFlower Gardener
parent 5a96e4e672
commit 897e2bcf65
7 changed files with 106 additions and 86 deletions

View File

@ -25,7 +25,7 @@ tensorflow/core/lib/random/simple_philox.cc
tensorflow/core/lib/random/random.cc
tensorflow/core/lib/random/distribution_sampler.cc
tensorflow/core/lib/io/zlib_outputbuffer.cc
tensorflow/core/lib/io/zlib_inputbuffer.cc
tensorflow/core/lib/io/zlib_inputstream.cc
tensorflow/core/lib/io/two_level_iterator.cc
tensorflow/core/lib/io/table_builder.cc
tensorflow/core/lib/io/table.cc

View File

@ -891,7 +891,7 @@ cc_library(
"lib/io/snappy/snappy_inputbuffer.h",
"lib/io/snappy/snappy_outputbuffer.h",
"lib/io/zlib_compression_options.h",
"lib/io/zlib_inputbuffer.h",
"lib/io/zlib_inputstream.h",
"lib/io/zlib_outputbuffer.h",
"lib/jpeg/jpeg_handle.h",
"lib/png/png_io.h",

View File

@ -20,6 +20,7 @@ limitations under the License.
#include "tensorflow/core/lib/core/coding.h"
#include "tensorflow/core/lib/core/errors.h"
#include "tensorflow/core/lib/hash/crc32c.h"
#include "tensorflow/core/lib/io/random_inputstream.h"
#include "tensorflow/core/platform/env.h"
namespace tensorflow {
@ -33,8 +34,9 @@ RecordReader::RecordReader(RandomAccessFile* file,
#if defined(IS_SLIM_BUILD)
LOG(FATAL) << "Zlib compression is unsupported on mobile platforms.";
#else // IS_SLIM_BUILD
zlib_input_buffer_.reset(new ZlibInputBuffer(
src_, options.zlib_options.input_buffer_size,
random_input_stream_.reset(new RandomAccessInputStream(file));
zlib_input_stream_.reset(new ZlibInputStream(
random_input_stream_.get(), options.zlib_options.input_buffer_size,
options.zlib_options.output_buffer_size, options.zlib_options));
#endif // IS_SLIM_BUILD
} else if (options.compression_type == RecordReaderOptions::NONE) {
@ -44,7 +46,10 @@ RecordReader::RecordReader(RandomAccessFile* file,
}
}
RecordReader::~RecordReader() {}
RecordReader::~RecordReader() {
zlib_input_stream_.reset(nullptr);
random_input_stream_.reset(nullptr);
}
// Read n+4 bytes from file, verify that checksum of first n bytes is
// stored in the last 4 bytes and store the first n bytes in *result.
@ -59,7 +64,7 @@ Status RecordReader::ReadChecksummed(uint64 offset, size_t n,
storage->resize(expected);
#if !defined(IS_SLIM_BUILD)
if (zlib_input_buffer_) {
if (zlib_input_stream_) {
// If we have a zlib compressed buffer, we assume that the
// file is being read sequentially, and we use the underlying
// implementation to read the data.
@ -67,7 +72,7 @@ Status RecordReader::ReadChecksummed(uint64 offset, size_t n,
// No checks are done to validate that the file is being read
// sequentially. At some point the zlib input buffer may support
// seeking, possibly inefficiently.
TF_RETURN_IF_ERROR(zlib_input_buffer_->ReadNBytes(expected, storage));
TF_RETURN_IF_ERROR(zlib_input_stream_->ReadNBytes(expected, storage));
if (storage->size() != expected) {
if (storage->size() == 0) {

View File

@ -18,10 +18,10 @@ limitations under the License.
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/core/stringpiece.h"
#include "tensorflow/core/lib/io/inputbuffer.h"
#if !defined(IS_SLIM_BUILD)
#include "tensorflow/core/lib/io/random_inputstream.h"
#include "tensorflow/core/lib/io/zlib_compression_options.h"
#include "tensorflow/core/lib/io/zlib_inputbuffer.h"
#include "tensorflow/core/lib/io/zlib_inputstream.h"
#endif // IS_SLIM_BUILD
#include "tensorflow/core/platform/macros.h"
#include "tensorflow/core/platform/types.h"
@ -64,7 +64,8 @@ class RecordReader {
RandomAccessFile* src_;
RecordReaderOptions options_;
#if !defined(IS_SLIM_BUILD)
std::unique_ptr<ZlibInputBuffer> zlib_input_buffer_;
std::unique_ptr<RandomAccessInputStream> random_input_stream_;
std::unique_ptr<ZlibInputStream> zlib_input_stream_;
#endif // IS_SLIM_BUILD
TF_DISALLOW_COPY_AND_ASSIGN(RecordReader);

View File

@ -14,13 +14,14 @@ limitations under the License.
==============================================================================*/
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/io/inputbuffer.h"
#include "tensorflow/core/lib/io/random_inputstream.h"
#include "tensorflow/core/lib/io/zlib_compression_options.h"
#include "tensorflow/core/lib/io/zlib_inputbuffer.h"
#include "tensorflow/core/lib/io/zlib_inputstream.h"
#include "tensorflow/core/lib/io/zlib_outputbuffer.h"
#include "tensorflow/core/lib/strings/strcat.h"
namespace tensorflow {
namespace io {
static std::vector<int> InputBufferSizes() {
return {10, 100, 200, 500, 1000, 10000};
@ -70,8 +71,8 @@ void TestAllCombinations(CompressionOptions input_options,
TF_CHECK_OK(env->NewWritableFile(fname, &file_writer));
string result;
io::ZlibOutputBuffer out(file_writer.get(), input_buf_size,
output_buf_size, output_options);
ZlibOutputBuffer out(file_writer.get(), input_buf_size, output_buf_size,
output_options);
TF_CHECK_OK(out.Write(StringPiece(data)));
TF_CHECK_OK(out.Close());
@ -80,9 +81,11 @@ void TestAllCombinations(CompressionOptions input_options,
std::unique_ptr<RandomAccessFile> file_reader;
TF_CHECK_OK(env->NewRandomAccessFile(fname, &file_reader));
io::ZlibInputBuffer in(file_reader.get(), input_buf_size,
output_buf_size, input_options);
TF_CHECK_OK(in.ReadNBytes(data.size(), &result));
std::unique_ptr<RandomAccessInputStream> input_stream(
new RandomAccessInputStream(file_reader.get()));
ZlibInputStream in(input_stream.get(), input_buf_size, output_buf_size,
input_options);
TF_EXPECT_OK(in.ReadNBytes(data.size(), &result));
EXPECT_EQ(result, data);
}
}
@ -115,8 +118,8 @@ void TestMultipleWrites(uint8 input_buf_size, uint8 output_buf_size,
string expected_result;
TF_CHECK_OK(env->NewWritableFile(fname, &file_writer));
io::ZlibOutputBuffer out(file_writer.get(), input_buf_size, output_buf_size,
output_options);
ZlibOutputBuffer out(file_writer.get(), input_buf_size, output_buf_size,
output_options);
for (int i = 0; i < num_writes; i++) {
TF_CHECK_OK(out.Write(StringPiece(data)));
@ -131,12 +134,14 @@ void TestMultipleWrites(uint8 input_buf_size, uint8 output_buf_size,
std::unique_ptr<RandomAccessFile> file_reader;
TF_CHECK_OK(env->NewRandomAccessFile(fname, &file_reader));
io::ZlibInputBuffer in(file_reader.get(), input_buf_size, output_buf_size,
input_options);
std::unique_ptr<RandomAccessInputStream> input_stream(
new RandomAccessInputStream(file_reader.get()));
ZlibInputStream in(input_stream.get(), input_buf_size, output_buf_size,
input_options);
for (int i = 0; i < num_writes; i++) {
string decompressed_output;
TF_CHECK_OK(in.ReadNBytes(data.size(), &decompressed_output));
TF_EXPECT_OK(in.ReadNBytes(data.size(), &decompressed_output));
strings::StrAppend(&actual_result, decompressed_output);
}
@ -151,7 +156,7 @@ TEST(ZlibBuffers, MultipleWriteCallsWithFlush) {
TestMultipleWrites(200, 200, 10, true);
}
TEST(ZlibInputBuffer, FailsToReadIfWindowBitsAreIncompatible) {
TEST(ZlibInputStream, FailsToReadIfWindowBitsAreIncompatible) {
Env* env = Env::Default();
string fname = testing::TmpDir() + "/zlib_buffers_test";
CompressionOptions output_options = CompressionOptions::DEFAULT();
@ -165,8 +170,8 @@ TEST(ZlibInputBuffer, FailsToReadIfWindowBitsAreIncompatible) {
std::unique_ptr<WritableFile> file_writer;
TF_CHECK_OK(env->NewWritableFile(fname, &file_writer));
string result;
io::ZlibOutputBuffer out(file_writer.get(), input_buf_size, output_buf_size,
output_options);
ZlibOutputBuffer out(file_writer.get(), input_buf_size, output_buf_size,
output_options);
TF_CHECK_OK(out.Write(StringPiece(data)));
TF_CHECK_OK(out.Close());
@ -175,11 +180,14 @@ TEST(ZlibInputBuffer, FailsToReadIfWindowBitsAreIncompatible) {
std::unique_ptr<RandomAccessFile> file_reader;
TF_CHECK_OK(env->NewRandomAccessFile(fname, &file_reader));
io::ZlibInputBuffer in(file_reader.get(), input_buf_size, output_buf_size,
input_options);
std::unique_ptr<RandomAccessInputStream> input_stream(
new RandomAccessInputStream(file_reader.get()));
ZlibInputStream in(input_stream.get(), input_buf_size, output_buf_size,
input_options);
Status read_status = in.ReadNBytes(data.size(), &result);
CHECK_EQ(read_status.code(), error::DATA_LOSS);
CHECK(read_status.error_message().find("inflate() failed") != string::npos);
}
} // namespace io
} // namespace tensorflow

View File

@ -13,20 +13,20 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/lib/io/zlib_inputbuffer.h"
#include "tensorflow/core/lib/io/zlib_inputstream.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/logging.h"
namespace tensorflow {
namespace io {
ZlibInputBuffer::ZlibInputBuffer(
RandomAccessFile* file,
ZlibInputStream::ZlibInputStream(
InputStreamInterface* input_stream,
size_t input_buffer_bytes, // size of z_stream.next_in buffer
size_t output_buffer_bytes, // size of z_stream.next_out buffer
const ZlibCompressionOptions& zlib_options)
: file_(file),
file_pos_(0),
: input_stream_(input_stream),
input_buffer_capacity_(input_buffer_bytes),
output_buffer_capacity_(output_buffer_bytes),
z_stream_input_(new Bytef[input_buffer_capacity_]),
@ -48,19 +48,19 @@ ZlibInputBuffer::ZlibInputBuffer(
} else {
z_stream_->next_in = z_stream_input_.get();
z_stream_->next_out = z_stream_output_.get();
next_unread_byte_ = (char*)z_stream_output_.get();
next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get());
z_stream_->avail_in = 0;
z_stream_->avail_out = output_buffer_capacity_;
}
}
ZlibInputBuffer::~ZlibInputBuffer() {
ZlibInputStream::~ZlibInputStream() {
if (z_stream_.get()) {
inflateEnd(z_stream_.get());
}
}
Status ZlibInputBuffer::ReadFromFile() {
Status ZlibInputStream::ReadFromStream() {
int bytes_to_read = input_buffer_capacity_;
char* read_location = reinterpret_cast<char*>(z_stream_input_.get());
@ -77,12 +77,12 @@ Status ZlibInputBuffer::ReadFromFile() {
bytes_to_read -= z_stream_->avail_in;
read_location += z_stream_->avail_in;
}
StringPiece data;
string data;
// Try to read enough data to fill up z_stream_input_.
Status s = file_->Read(file_pos_, bytes_to_read, &data, read_location);
if (data.data() != read_location) {
memmove(read_location, data.data(), data.size());
}
// TODO(rohanj): Add a char* version of ReadNBytes to InputStreamInterface
// and use that instead to make this more efficient.
Status s = input_stream_->ReadNBytes(bytes_to_read, &data);
memcpy(read_location, data.data(), data.size());
// Since we moved unread data to the head of the input stream we can point
// next_in to the head of the input stream.
@ -90,16 +90,15 @@ Status ZlibInputBuffer::ReadFromFile() {
// Note: data.size() could be different from bytes_to_read.
z_stream_->avail_in += data.size();
file_pos_ += data.size();
if (!s.ok() && !errors::IsOutOfRange(s)) {
return s;
}
// We throw OutOfRange error iff no new data has been read from file.
// Since we never check how much data is remaining in the file, it is
// possible that on the last read there isn't enough data in the file to
// fill up the buffer in which case file_->ReadNBytes would return an
// We throw OutOfRange error iff no new data has been read from stream.
// Since we never check how much data is remaining in the stream, it is
// possible that on the last read there isn't enough data in the stream to
// fill up the buffer in which case input_stream_->ReadNBytes would return an
// OutOfRange error.
if (data.size() == 0) {
return errors::OutOfRange("EOF reached");
@ -111,9 +110,10 @@ Status ZlibInputBuffer::ReadFromFile() {
return s;
}
size_t ZlibInputBuffer::ReadBytesFromCache(size_t bytes_to_read,
size_t ZlibInputStream::ReadBytesFromCache(size_t bytes_to_read,
string* result) {
size_t unread_bytes = (char*)z_stream_->next_out - next_unread_byte_;
size_t unread_bytes =
reinterpret_cast<char*>(z_stream_->next_out) - next_unread_byte_;
size_t can_read_bytes = std::min(bytes_to_read, unread_bytes);
if (can_read_bytes > 0) {
result->append(next_unread_byte_, can_read_bytes);
@ -122,34 +122,35 @@ size_t ZlibInputBuffer::ReadBytesFromCache(size_t bytes_to_read,
return can_read_bytes;
}
size_t ZlibInputBuffer::NumUnreadBytes() const {
size_t read_bytes = next_unread_byte_ - (char*)z_stream_output_.get();
size_t ZlibInputStream::NumUnreadBytes() const {
size_t read_bytes =
next_unread_byte_ - reinterpret_cast<char*>(z_stream_output_.get());
return output_buffer_capacity_ - z_stream_->avail_out - read_bytes;
}
Status ZlibInputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
Status ZlibInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
result->clear();
// Read as many bytes as possible from cache.
bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
while (bytes_to_read > 0) {
// At this point we can be sure that cache has been emptied.
DCHECK(NumUnreadBytes() == 0);
DCHECK_EQ(NumUnreadBytes(), 0);
// Now that the cache is empty we need to inflate more data.
// Step 1. Fill up input buffer.
// We read from file only after the previously read contents have been
// We read from stream only after the previously read contents have been
// completely consumed. This is an optimization and can be removed if
// it causes problems. `ReadFromFile` is capable of handling partially
// it causes problems. `ReadFromStream` is capable of handling partially
// filled up buffers.
if (z_stream_->avail_in == 0) {
TF_RETURN_IF_ERROR(ReadFromFile());
TF_RETURN_IF_ERROR(ReadFromStream());
}
// Step 2. Setup output stream.
z_stream_->next_out = z_stream_output_.get();
next_unread_byte_ = (char*)z_stream_output_.get();
next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get());
z_stream_->avail_out = output_buffer_capacity_;
// Step 3. Inflate Inflate Inflate!
@ -161,7 +162,10 @@ Status ZlibInputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
return Status::OK();
}
Status ZlibInputBuffer::Inflate() {
// TODO(srbs): Implement this.
int64 ZlibInputStream::Tell() const { return -1; }
Status ZlibInputStream::Inflate() {
int error = inflate(z_stream_.get(), zlib_options_.flush_mode);
if (error != Z_OK && error != Z_STREAM_END) {
string error_string =

View File

@ -13,11 +13,12 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_LIB_IO_COMPRESSED_INPUTBUFFER_H_
#define TENSORFLOW_LIB_IO_COMPRESSED_INPUTBUFFER_H_
#ifndef TENSORFLOW_LIB_IO_ZLIB_INPUTSTREAM_H_
#define TENSORFLOW_LIB_IO_ZLIB_INPUTSTREAM_H_
#include <string>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/io/inputstream_interface.h"
#include "tensorflow/core/lib/io/zlib_compression_options.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/macros.h"
@ -34,47 +35,48 @@ limitations under the License.
namespace tensorflow {
namespace io {
// An ZlibInputBuffer provides support for reading from a file compressed using
// zlib (http://www.zlib.net/).
// An ZlibInputStream provides support for reading from a stream compressed
// using zlib (http://www.zlib.net/). Buffers the contents of the file.
//
// A given instance of an ZlibInputBuffer is NOT safe for concurrent use
// A given instance of an ZlibInputStream is NOT safe for concurrent use
// by multiple threads
class ZlibInputBuffer {
class ZlibInputStream : public InputStreamInterface {
public:
// Create a ZlibInputBuffer for `file` with a buffer of size
// `input_buffer_bytes` bytes for reading contents from `file` and another
// buffer with size `output_buffer_bytes` for caching decompressed contents.
// Does *not* take ownership of "file".
ZlibInputBuffer(RandomAccessFile* file, size_t input_buffer_bytes,
// Create a ZlibInputBuffer for `input_stream` with a buffer of size
// `input_buffer_bytes` bytes for reading contents from `input_stream` and
// another buffer with size `output_buffer_bytes` for caching decompressed
// contents. Does *not* take ownership of "input_stream".
ZlibInputStream(InputStreamInterface* input_stream, size_t input_buffer_bytes,
size_t output_buffer_bytes,
const ZlibCompressionOptions& zlib_options);
~ZlibInputBuffer();
~ZlibInputStream();
// Reads bytes_to_read bytes into *result, overwriting *result.
//
// Return Status codes:
// OK: If successful.
// OUT_OF_RANGE: If there are not enough bytes to read before
// the end of the file.
// the end of the stream.
// ABORTED: If inflate() fails, we return the error code with the
// error message in `z_stream_->msg`.
// others: If reading from file failed.
Status ReadNBytes(int64 bytes_to_read, string* result);
// others: If reading from stream failed.
Status ReadNBytes(int64 bytes_to_read, string* result) override;
int64 Tell() const override;
private:
RandomAccessFile* file_; // Not owned
int64 file_pos_; // Next position to read from in `file_`
size_t input_buffer_capacity_; // Size of `z_stream_input_`
size_t output_buffer_capacity_; // Size of `z_stream_output_`
char* next_unread_byte_; // Next unread byte in `z_stream_output_`
InputStreamInterface* input_stream_; // Not owned
size_t input_buffer_capacity_; // Size of z_stream_input_
size_t output_buffer_capacity_; // Size of z_stream_output_
char* next_unread_byte_; // Next unread byte in z_stream_output_
// Buffer for storing contents read from compressed file.
// Buffer for storing contents read from compressed stream.
// TODO(srbs): Consider using circular buffers. That would greatly simplify
// the implementation.
std::unique_ptr<Bytef[]> z_stream_input_;
// Buffer for storing inflated contents of `file_`.
// Buffer for storing inflated contents of `input_stream_`.
std::unique_ptr<Bytef[]> z_stream_output_;
ZlibCompressionOptions const zlib_options_;
@ -92,8 +94,8 @@ class ZlibInputBuffer {
// Number of free bytes available at write location.
std::unique_ptr<z_stream> z_stream_;
// Reads data from `file_` and tries to fill up `z_stream_input_` if enough
// unread data is left in `file_`.
// Reads data from `input_stream_` and tries to fill up `z_stream_input_` if
// enough unread data is left in `input_stream_`.
//
// Looks up z_stream_->next_in to check how much data in z_stream_input_
// has already been read. The used data is removed and new data is added to
@ -102,10 +104,10 @@ class ZlibInputBuffer {
// and z_stream_->avail_in stores the number of readable bytes in
// z_stream_input_.
//
// Returns OutOfRange error if NO data could be read from file. Note that this
// won't return an OutOfRange if there wasn't sufficient data in file to
// completely fill up z_stream_input_.
Status ReadFromFile();
// Returns OutOfRange error if NO data could be read from stream. Note that
// this won't return an OutOfRange if there wasn't sufficient data in stream
// to completely fill up z_stream_input_.
Status ReadFromStream();
// Calls `inflate()` and returns DataLoss Status if it failed.
Status Inflate();
@ -131,10 +133,10 @@ class ZlibInputBuffer {
// Returns the size of [next_unread_byte_, z_stream_->next_out)
size_t NumUnreadBytes() const;
TF_DISALLOW_COPY_AND_ASSIGN(ZlibInputBuffer);
TF_DISALLOW_COPY_AND_ASSIGN(ZlibInputStream);
};
} // namespace io
} // namespace tensorflow
#endif // TENSORFLOW_LIB_IO_CompressedInputBuffer_H_
#endif // TENSORFLOW_LIB_IO_ZLIB_INPUTSTREAM_H_