diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 398b91b984c..bf48e8e5722 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -117,7 +117,7 @@ Status DataServiceDispatcherImpl::Start() { Update update; bool end_of_journal = false; FileJournalReader reader(Env::Default(), JournalDir(config_.work_dir())); - Status s = reader.Read(&update, &end_of_journal); + Status s = reader.Read(update, end_of_journal); if (errors::IsNotFound(s)) { LOG(INFO) << "No journal found. Starting dispatcher from new state."; } else if (!s.ok()) { @@ -125,7 +125,7 @@ Status DataServiceDispatcherImpl::Start() { } else { while (!end_of_journal) { TF_RETURN_IF_ERROR(ApplyWithoutJournaling(update)); - TF_RETURN_IF_ERROR(reader.Read(&update, &end_of_journal)); + TF_RETURN_IF_ERROR(reader.Read(update, end_of_journal)); } } // Initialize the journal writer in `Start` so that we fail fast in case it diff --git a/tensorflow/core/data/service/journal.cc b/tensorflow/core/data/service/journal.cc index b0ce0876c69..979fc78b7c0 100644 --- a/tensorflow/core/data/service/journal.cc +++ b/tensorflow/core/data/service/journal.cc @@ -96,7 +96,7 @@ Status FileJournalReader::EnsureInitialized() { return UpdateFile(DataServiceJournalFile(journal_dir_, 0)); } -Status FileJournalReader::Read(Update* update, bool* end_of_journal) { +Status FileJournalReader::Read(Update& update, bool& end_of_journal) { TF_RETURN_IF_ERROR(EnsureInitialized()); while (true) { tstring record; @@ -108,20 +108,20 @@ Status FileJournalReader::Read(Update* update, bool* end_of_journal) { if (errors::IsNotFound(env_->FileExists(next_journal_file))) { VLOG(3) << "Next journal file " << next_journal_file << " does not exist. End of journal reached."; - *end_of_journal = true; + end_of_journal = true; return Status::OK(); } TF_RETURN_IF_ERROR(UpdateFile(next_journal_file)); continue; } TF_RETURN_IF_ERROR(s); - if (!update->ParseFromString(record)) { + if (!update.ParseFromString(record)) { return errors::DataLoss("Failed to parse journal record."); } if (VLOG_IS_ON(4)) { - VLOG(4) << "Read journal entry: " << update->DebugString(); + VLOG(4) << "Read journal entry: " << update.DebugString(); } - *end_of_journal = false; + end_of_journal = false; return Status::OK(); } } diff --git a/tensorflow/core/data/service/journal.h b/tensorflow/core/data/service/journal.h index 3483497705e..e31830e8c35 100644 --- a/tensorflow/core/data/service/journal.h +++ b/tensorflow/core/data/service/journal.h @@ -77,9 +77,9 @@ class FileJournalWriter : public JournalWriter { class JournalReader { public: virtual ~JournalReader() = default; - // Reads the next update from the journal. Sets `*end_of_journal=true` if + // Reads the next update from the journal. Sets `end_of_journal=true` if // there are no more updates left in the journal. - virtual Status Read(Update* update, bool* end_of_journal) = 0; + virtual Status Read(Update& update, bool& end_of_journal) = 0; }; // JournalReader is not thread-safe, requiring external synchronization when @@ -93,7 +93,7 @@ class FileJournalReader : public JournalReader { FileJournalReader(const FileJournalReader&) = delete; FileJournalReader& operator=(const FileJournalReader&) = delete; - Status Read(Update* update, bool* end_of_journal) override; + Status Read(Update& update, bool& end_of_journal) override; private: // Initializes the reader if it is not yet initialized. diff --git a/tensorflow/core/data/service/journal_test.cc b/tensorflow/core/data/service/journal_test.cc index 313b216fe76..3f55447cc68 100644 --- a/tensorflow/core/data/service/journal_test.cc +++ b/tensorflow/core/data/service/journal_test.cc @@ -28,12 +28,12 @@ namespace data { namespace { using ::testing::HasSubstr; -bool NewJournalDir(std::string* journal_dir) { +bool NewJournalDir(std::string& journal_dir) { std::string filename = testing::TmpDir(); if (!Env::Default()->CreateUniqueFileName(&filename, "journal_dir")) { return false; } - *journal_dir = filename; + journal_dir = filename; return true; } @@ -67,7 +67,7 @@ Status CheckJournalContent(StringPiece journal_dir, for (const auto& update : expected) { Update result; bool end_of_journal = true; - TF_RETURN_IF_ERROR(reader.Read(&result, &end_of_journal)); + TF_RETURN_IF_ERROR(reader.Read(result, end_of_journal)); EXPECT_FALSE(end_of_journal); // We can't use the testing::EqualsProto matcher because it is not available // in OSS. @@ -75,7 +75,7 @@ Status CheckJournalContent(StringPiece journal_dir, } Update result; bool end_of_journal = false; - TF_RETURN_IF_ERROR(reader.Read(&result, &end_of_journal)); + TF_RETURN_IF_ERROR(reader.Read(result, end_of_journal)); EXPECT_TRUE(end_of_journal); return Status::OK(); } @@ -83,7 +83,7 @@ Status CheckJournalContent(StringPiece journal_dir, TEST(Journal, RoundTripMultiple) { std::string journal_dir; - EXPECT_TRUE(NewJournalDir(&journal_dir)); + EXPECT_TRUE(NewJournalDir(journal_dir)); std::vector updates = {MakeCreateJobUpdate(), MakeRegisterDatasetUpdate(), MakeFinishTaskUpdate()}; @@ -97,7 +97,7 @@ TEST(Journal, RoundTripMultiple) { TEST(Journal, AppendExistingJournal) { std::string journal_dir; - EXPECT_TRUE(NewJournalDir(&journal_dir)); + EXPECT_TRUE(NewJournalDir(journal_dir)); std::vector updates = {MakeCreateJobUpdate(), MakeRegisterDatasetUpdate(), MakeFinishTaskUpdate()}; @@ -111,17 +111,17 @@ TEST(Journal, AppendExistingJournal) { TEST(Journal, MissingFile) { std::string journal_dir; - EXPECT_TRUE(NewJournalDir(&journal_dir)); + EXPECT_TRUE(NewJournalDir(journal_dir)); FileJournalReader reader(Env::Default(), journal_dir); Update result; bool end_of_journal = true; - Status s = reader.Read(&result, &end_of_journal); + Status s = reader.Read(result, end_of_journal); EXPECT_TRUE(errors::IsNotFound(s)); } TEST(Journal, NonRecordData) { std::string journal_dir; - EXPECT_TRUE(NewJournalDir(&journal_dir)); + EXPECT_TRUE(NewJournalDir(journal_dir)); TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir)); { @@ -134,14 +134,14 @@ TEST(Journal, NonRecordData) { FileJournalReader reader(Env::Default(), journal_dir); Update result; bool end_of_journal = true; - Status s = reader.Read(&result, &end_of_journal); + Status s = reader.Read(result, end_of_journal); EXPECT_THAT(s.error_message(), HasSubstr("corrupted record")); EXPECT_EQ(s.code(), error::DATA_LOSS); } TEST(Journal, InvalidRecordData) { std::string journal_dir; - EXPECT_TRUE(NewJournalDir(&journal_dir)); + EXPECT_TRUE(NewJournalDir(journal_dir)); TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir)); { @@ -155,7 +155,7 @@ TEST(Journal, InvalidRecordData) { FileJournalReader reader(Env::Default(), journal_dir); Update result; bool end_of_journal = true; - Status s = reader.Read(&result, &end_of_journal); + Status s = reader.Read(result, end_of_journal); EXPECT_THAT(s.error_message(), HasSubstr("Failed to parse journal record")); EXPECT_EQ(s.code(), error::DATA_LOSS); }