Increase snappy reader input buffer sizes to 1 GB for large tensor sizes
PiperOrigin-RevId: 282710052 Change-Id: I5b5fbb88a5e0c0e1cc478cff064227057ea85e06
This commit is contained in:
parent
bf6e90ccc6
commit
469b6cace0
@ -60,7 +60,14 @@ enum SnapshotMode { READER = 0, WRITER = 1, PASSTHROUGH = 2 };
|
|||||||
// Defaults to 10 GiB per shard.
|
// Defaults to 10 GiB per shard.
|
||||||
const int64 kDefaultShardSizeBytes = 10LL * 1024 * 1024 * 1024;
|
const int64 kDefaultShardSizeBytes = 10LL * 1024 * 1024 * 1024;
|
||||||
|
|
||||||
const int64 kSnappyBufferSizeBytes = 256 << 10; // 256 KB
|
const int64 kSnappyWriterInputBufferSizeBytes = 16 << 20; // 16 MiB
|
||||||
|
const int64 kSnappyWriterOutputBufferSizeBytes = 16 << 20; // 16 MiB
|
||||||
|
|
||||||
|
// The reader input buffer size is deliberately large because the input reader
|
||||||
|
// will throw an error if the compressed block length cannot fit in the input
|
||||||
|
// buffer.
|
||||||
|
const int64 kSnappyReaderInputBufferSizeBytes = 1 << 30; // 1 GiB
|
||||||
|
const int64 kSnappyReaderOutputBufferSizeBytes = 16 << 20; // 16 MiB
|
||||||
|
|
||||||
const size_t kHeaderSize = sizeof(uint64);
|
const size_t kHeaderSize = sizeof(uint64);
|
||||||
|
|
||||||
@ -101,8 +108,8 @@ class SnapshotWriter {
|
|||||||
dest_is_owned_ = true;
|
dest_is_owned_ = true;
|
||||||
} else if (compression_type == io::compression::kSnappy) {
|
} else if (compression_type == io::compression::kSnappy) {
|
||||||
io::SnappyOutputBuffer* snappy_output_buffer = new io::SnappyOutputBuffer(
|
io::SnappyOutputBuffer* snappy_output_buffer = new io::SnappyOutputBuffer(
|
||||||
dest, /*input_buffer_bytes=*/kSnappyBufferSizeBytes,
|
dest, /*input_buffer_bytes=*/kSnappyWriterInputBufferSizeBytes,
|
||||||
/*output_buffer_bytes=*/kSnappyBufferSizeBytes);
|
/*output_buffer_bytes=*/kSnappyWriterOutputBufferSizeBytes);
|
||||||
dest_ = snappy_output_buffer;
|
dest_ = snappy_output_buffer;
|
||||||
dest_is_owned_ = true;
|
dest_is_owned_ = true;
|
||||||
}
|
}
|
||||||
@ -184,8 +191,8 @@ class SnapshotReader {
|
|||||||
zlib_options.output_buffer_size, zlib_options, true));
|
zlib_options.output_buffer_size, zlib_options, true));
|
||||||
} else if (compression_type_ == io::compression::kSnappy) {
|
} else if (compression_type_ == io::compression::kSnappy) {
|
||||||
input_stream_ = absl::make_unique<io::SnappyInputBuffer>(
|
input_stream_ = absl::make_unique<io::SnappyInputBuffer>(
|
||||||
file_, /*input_buffer_bytes=*/kSnappyBufferSizeBytes,
|
file_, /*input_buffer_bytes=*/kSnappyReaderInputBufferSizeBytes,
|
||||||
/*output_buffer_bytes=*/kSnappyBufferSizeBytes);
|
/*output_buffer_bytes=*/kSnappyReaderOutputBufferSizeBytes);
|
||||||
}
|
}
|
||||||
#endif // IS_SLIM_BUILD
|
#endif // IS_SLIM_BUILD
|
||||||
}
|
}
|
||||||
|
@ -265,13 +265,14 @@ class SnapshotDatasetTest(reader_dataset_ops_test_base.TFRecordDatasetTestBase,
|
|||||||
reader_buffer_size=10))
|
reader_buffer_size=10))
|
||||||
self.assertDatasetProduces(dataset2, expected, assert_items_equal=True)
|
self.assertDatasetProduces(dataset2, expected, assert_items_equal=True)
|
||||||
|
|
||||||
|
# Not testing Snappy here because Snappy reads currently require a lot of
|
||||||
|
# memory.
|
||||||
@combinations.generate(
|
@combinations.generate(
|
||||||
combinations.times(
|
combinations.times(
|
||||||
test_base.default_test_combinations(),
|
test_base.default_test_combinations(),
|
||||||
combinations.times(
|
combinations.times(
|
||||||
combinations.combine(compression=[
|
combinations.combine(compression=[
|
||||||
snapshot.COMPRESSION_NONE, snapshot.COMPRESSION_GZIP,
|
snapshot.COMPRESSION_NONE, snapshot.COMPRESSION_GZIP
|
||||||
snapshot.COMPRESSION_SNAPPY
|
|
||||||
]),
|
]),
|
||||||
combinations.combine(threads=2, size=[1, 2]) +
|
combinations.combine(threads=2, size=[1, 2]) +
|
||||||
combinations.combine(threads=8, size=[1, 4, 8]))))
|
combinations.combine(threads=8, size=[1, 4, 8]))))
|
||||||
|
Loading…
Reference in New Issue
Block a user