Enable Cord reading and writing for InputStream and OutputStreams

PiperOrigin-RevId: 343014311
Change-Id: If1b7fee3cb1432e35952104984e64e54a1aa2f72
This commit is contained in:
Frank Chen 2020-11-17 22:17:49 -08:00 committed by TensorFlower Gardener
parent 8d06472606
commit 516ae286f6
15 changed files with 29 additions and 27 deletions

View File

@ -263,7 +263,7 @@ uint32 Extend(uint32 crc, const char *buf, size_t size) {
return l ^ 0xffffffffu; return l ^ 0xffffffffu;
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
uint32 Extend(uint32 crc, const absl::Cord &cord) { uint32 Extend(uint32 crc, const absl::Cord &cord) {
for (absl::string_view fragment : cord.Chunks()) { for (absl::string_view fragment : cord.Chunks()) {
crc = Extend(crc, fragment.data(), fragment.size()); crc = Extend(crc, fragment.data(), fragment.size());

View File

@ -32,11 +32,14 @@ namespace crc32c {
// crc32c of a stream of data. // crc32c of a stream of data.
extern uint32 Extend(uint32 init_crc, const char* data, size_t n); extern uint32 Extend(uint32 init_crc, const char* data, size_t n);
#if defined(TF_CORD_SUPPORT)
extern uint32 Extend(uint32 init_crc, const absl::Cord& cord);
#endif
// Return the crc32c of data[0,n-1] // Return the crc32c of data[0,n-1]
inline uint32 Value(const char* data, size_t n) { return Extend(0, data, n); } inline uint32 Value(const char* data, size_t n) { return Extend(0, data, n); }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
extern uint32 Extend(uint32 init_crc, const absl::Cord& cord);
inline uint32 Value(const absl::Cord& cord) { return Extend(0, cord); } inline uint32 Value(const absl::Cord& cord) { return Extend(0, cord); }
#endif #endif

View File

@ -37,7 +37,7 @@ class InputStreamInterface {
// * OUT_OF_RANGE - not enough bytes remaining before end of file. // * OUT_OF_RANGE - not enough bytes remaining before end of file.
virtual Status ReadNBytes(int64 bytes_to_read, tstring* result) = 0; virtual Status ReadNBytes(int64 bytes_to_read, tstring* result) = 0;
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
// Reads the next bytes_to_read from the file. Typical return codes: // Reads the next bytes_to_read from the file. Typical return codes:
// * OK - in case of success. // * OK - in case of success.
// * OUT_OF_RANGE - not enough bytes remaining before end of file. // * OUT_OF_RANGE - not enough bytes remaining before end of file.

View File

@ -49,7 +49,7 @@ Status RandomAccessInputStream::ReadNBytes(int64 bytes_to_read,
return s; return s;
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status RandomAccessInputStream::ReadNBytes(int64 bytes_to_read, Status RandomAccessInputStream::ReadNBytes(int64 bytes_to_read,
absl::Cord* result) { absl::Cord* result) {
if (bytes_to_read < 0) { if (bytes_to_read < 0) {

View File

@ -35,7 +35,7 @@ class RandomAccessInputStream : public InputStreamInterface {
Status ReadNBytes(int64 bytes_to_read, tstring* result) override; Status ReadNBytes(int64 bytes_to_read, tstring* result) override;
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status ReadNBytes(int64 bytes_to_read, absl::Cord* result) override; Status ReadNBytes(int64 bytes_to_read, absl::Cord* result) override;
#endif #endif

View File

@ -115,7 +115,7 @@ Status RecordWriter::WriteRecord(StringPiece data) {
return dest_->Append(StringPiece(footer, sizeof(footer))); return dest_->Append(StringPiece(footer, sizeof(footer)));
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status RecordWriter::WriteRecord(const absl::Cord& data) { Status RecordWriter::WriteRecord(const absl::Cord& data) {
if (dest_ == nullptr) { if (dest_ == nullptr) {
return Status(::tensorflow::error::FAILED_PRECONDITION, return Status(::tensorflow::error::FAILED_PRECONDITION,

View File

@ -68,8 +68,8 @@ class RecordWriter {
// Create a writer that will append data to "*dest". // Create a writer that will append data to "*dest".
// "*dest" must be initially empty. // "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use. // "*dest" must remain live while this Writer is in use.
RecordWriter(WritableFile* dest, explicit RecordWriter(WritableFile* dest, const RecordWriterOptions& options =
const RecordWriterOptions& options = RecordWriterOptions()); RecordWriterOptions());
// Calls Close() and logs if an error occurs. // Calls Close() and logs if an error occurs.
// //
@ -79,7 +79,7 @@ class RecordWriter {
Status WriteRecord(StringPiece data); Status WriteRecord(StringPiece data);
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status WriteRecord(const absl::Cord& data); Status WriteRecord(const absl::Cord& data);
#endif #endif
@ -98,12 +98,13 @@ class RecordWriter {
// "header[0,kHeaderSize-1]". The record-header is based on data[0, n-1]. // "header[0,kHeaderSize-1]". The record-header is based on data[0, n-1].
inline static void PopulateHeader(char* header, const char* data, size_t n); inline static void PopulateHeader(char* header, const char* data, size_t n);
inline static void PopulateHeader(char* header, const absl::Cord& data);
// Utility method to populate TFRecord footers. Populates record-footer in // Utility method to populate TFRecord footers. Populates record-footer in
// "footer[0,kFooterSize-1]". The record-footer is based on data[0, n-1]. // "footer[0,kFooterSize-1]". The record-footer is based on data[0, n-1].
inline static void PopulateFooter(char* footer, const char* data, size_t n); inline static void PopulateFooter(char* footer, const char* data, size_t n);
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
inline static void PopulateHeader(char* header, const absl::Cord& data);
inline static void PopulateFooter(char* footer, const absl::Cord& data); inline static void PopulateFooter(char* footer, const absl::Cord& data);
#endif #endif
@ -115,7 +116,7 @@ class RecordWriter {
return crc32c::Mask(crc32c::Value(data, n)); return crc32c::Mask(crc32c::Value(data, n));
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
inline static uint32 MaskedCrc(const absl::Cord& data) { inline static uint32 MaskedCrc(const absl::Cord& data) {
return crc32c::Mask(crc32c::Value(data)); return crc32c::Mask(crc32c::Value(data));
} }
@ -134,7 +135,7 @@ void RecordWriter::PopulateFooter(char* footer, const char* data, size_t n) {
core::EncodeFixed32(footer, MaskedCrc(data, n)); core::EncodeFixed32(footer, MaskedCrc(data, n));
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
void RecordWriter::PopulateHeader(char* header, const absl::Cord& data) { void RecordWriter::PopulateHeader(char* header, const absl::Cord& data) {
core::EncodeFixed64(header + 0, data.size()); core::EncodeFixed64(header + 0, data.size());
core::EncodeFixed32(header + sizeof(uint64), core::EncodeFixed32(header + sizeof(uint64),

View File

@ -62,7 +62,7 @@ class StringDest : public WritableFile {
contents_->append(slice.data(), slice.size()); contents_->append(slice.data(), slice.size());
return Status::OK(); return Status::OK();
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status Append(const absl::Cord& data) override { Status Append(const absl::Cord& data) override {
contents_->append(std::string(data)); contents_->append(std::string(data));
return Status::OK(); return Status::OK();
@ -136,7 +136,7 @@ class RecordioTest : public ::testing::Test {
TF_ASSERT_OK(writer_->WriteRecord(StringPiece(msg))); TF_ASSERT_OK(writer_->WriteRecord(StringPiece(msg)));
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
void Write(const absl::Cord& msg) { void Write(const absl::Cord& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read"; ASSERT_TRUE(!reading_) << "Write() after starting to read";
TF_ASSERT_OK(writer_->WriteRecord(msg)); TF_ASSERT_OK(writer_->WriteRecord(msg));
@ -204,7 +204,7 @@ TEST_F(RecordioTest, ReadWrite) {
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
TEST_F(RecordioTest, ReadWriteCords) { TEST_F(RecordioTest, ReadWriteCords) {
Write(absl::Cord("foo")); Write(absl::Cord("foo"));
Write(absl::Cord("bar")); Write(absl::Cord("bar"));

View File

@ -68,7 +68,7 @@ Status SnappyInputStream::ReadNBytes(int64 bytes_to_read, tstring* result) {
return Status::OK(); return Status::OK();
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status SnappyInputStream::ReadNBytes(int64 bytes_to_read, absl::Cord* result) { Status SnappyInputStream::ReadNBytes(int64 bytes_to_read, absl::Cord* result) {
// TODO(frankchn): Optimize this instead of bouncing through the buffer. // TODO(frankchn): Optimize this instead of bouncing through the buffer.
tstring buf; tstring buf;

View File

@ -46,7 +46,7 @@ class SnappyInputStream : public InputStreamInterface {
// others: If reading from stream failed. // others: If reading from stream failed.
Status ReadNBytes(int64 bytes_to_read, tstring* result) override; Status ReadNBytes(int64 bytes_to_read, tstring* result) override;
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status ReadNBytes(int64 bytes_to_read, absl::Cord* result) override; Status ReadNBytes(int64 bytes_to_read, absl::Cord* result) override;
#endif #endif

View File

@ -40,7 +40,7 @@ SnappyOutputBuffer::~SnappyOutputBuffer() {
Status SnappyOutputBuffer::Append(StringPiece data) { return Write(data); } Status SnappyOutputBuffer::Append(StringPiece data) { return Write(data); }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status SnappyOutputBuffer::Append(const absl::Cord& cord) { Status SnappyOutputBuffer::Append(const absl::Cord& cord) {
for (absl::string_view fragment : cord.Chunks()) { for (absl::string_view fragment : cord.Chunks()) {
TF_RETURN_IF_ERROR(Append(fragment)); TF_RETURN_IF_ERROR(Append(fragment));

View File

@ -65,7 +65,7 @@ class SnappyOutputBuffer : public WritableFile {
// later time. To immediately write contents to file call `Flush()`. // later time. To immediately write contents to file call `Flush()`.
Status Append(StringPiece data) override; Status Append(StringPiece data) override;
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status Append(const absl::Cord& cord) override; Status Append(const absl::Cord& cord) override;
#endif #endif

View File

@ -190,7 +190,7 @@ Status ZlibOutputBuffer::Append(StringPiece data) {
return Status::OK(); return Status::OK();
} }
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status ZlibOutputBuffer::Append(const absl::Cord& cord) { Status ZlibOutputBuffer::Append(const absl::Cord& cord) {
for (absl::string_view fragment : cord.Chunks()) { for (absl::string_view fragment : cord.Chunks()) {
TF_RETURN_IF_ERROR(Append(fragment)); TF_RETURN_IF_ERROR(Append(fragment));

View File

@ -65,7 +65,7 @@ class ZlibOutputBuffer : public WritableFile {
// To immediately write contents to file call `Flush()`. // To immediately write contents to file call `Flush()`.
Status Append(StringPiece data) override; Status Append(StringPiece data) override;
#if defined(PLATFORM_GOOGLE) #if defined(TF_CORD_SUPPORT)
Status Append(const absl::Cord& cord) override; Status Append(const absl::Cord& cord) override;
#endif #endif

View File

@ -751,8 +751,7 @@ class RandomAccessFile {
virtual tensorflow::Status Read(uint64 offset, size_t n, StringPiece* result, virtual tensorflow::Status Read(uint64 offset, size_t n, StringPiece* result,
char* scratch) const = 0; char* scratch) const = 0;
// TODO(ebrevdo): Remove this ifdef when absl is updated. #if defined(TF_CORD_SUPPORT)
#if defined(PLATFORM_GOOGLE)
/// \brief Read up to `n` bytes from the file starting at `offset`. /// \brief Read up to `n` bytes from the file starting at `offset`.
virtual tensorflow::Status Read(uint64 offset, size_t n, virtual tensorflow::Status Read(uint64 offset, size_t n,
absl::Cord* cord) const { absl::Cord* cord) const {
@ -778,8 +777,7 @@ class WritableFile {
/// \brief Append 'data' to the file. /// \brief Append 'data' to the file.
virtual tensorflow::Status Append(StringPiece data) = 0; virtual tensorflow::Status Append(StringPiece data) = 0;
// TODO(ebrevdo): Remove this ifdef when absl is updated. #if defined(TF_CORD_SUPPORT)
#if defined(PLATFORM_GOOGLE)
// \brief Append 'data' to the file. // \brief Append 'data' to the file.
virtual tensorflow::Status Append(const absl::Cord& cord) { virtual tensorflow::Status Append(const absl::Cord& cord) {
return errors::Unimplemented("Append(absl::Cord) is not implemented"); return errors::Unimplemented("Append(absl::Cord) is not implemented");