[tf.data service] Improve pointer/reference usage in journal.

This CL updates the code to use mutable references for output parameters, since they cannot be null.

PiperOrigin-RevId: 329004163
Change-Id: Ieb52ea158a36a2440a30f54819178f45af09f7ac
This commit is contained in:
Andrew Audibert 2020-08-28 13:57:23 -07:00 committed by TensorFlower Gardener
parent 880b2d7176
commit 6dcc90955d
4 changed files with 22 additions and 22 deletions

View File

@ -117,7 +117,7 @@ Status DataServiceDispatcherImpl::Start() {
Update update; Update update;
bool end_of_journal = false; bool end_of_journal = false;
FileJournalReader reader(Env::Default(), JournalDir(config_.work_dir())); FileJournalReader reader(Env::Default(), JournalDir(config_.work_dir()));
Status s = reader.Read(&update, &end_of_journal); Status s = reader.Read(update, end_of_journal);
if (errors::IsNotFound(s)) { if (errors::IsNotFound(s)) {
LOG(INFO) << "No journal found. Starting dispatcher from new state."; LOG(INFO) << "No journal found. Starting dispatcher from new state.";
} else if (!s.ok()) { } else if (!s.ok()) {
@ -125,7 +125,7 @@ Status DataServiceDispatcherImpl::Start() {
} else { } else {
while (!end_of_journal) { while (!end_of_journal) {
TF_RETURN_IF_ERROR(ApplyWithoutJournaling(update)); TF_RETURN_IF_ERROR(ApplyWithoutJournaling(update));
TF_RETURN_IF_ERROR(reader.Read(&update, &end_of_journal)); TF_RETURN_IF_ERROR(reader.Read(update, end_of_journal));
} }
} }
// Initialize the journal writer in `Start` so that we fail fast in case it // Initialize the journal writer in `Start` so that we fail fast in case it

View File

@ -96,7 +96,7 @@ Status FileJournalReader::EnsureInitialized() {
return UpdateFile(DataServiceJournalFile(journal_dir_, 0)); return UpdateFile(DataServiceJournalFile(journal_dir_, 0));
} }
Status FileJournalReader::Read(Update* update, bool* end_of_journal) { Status FileJournalReader::Read(Update& update, bool& end_of_journal) {
TF_RETURN_IF_ERROR(EnsureInitialized()); TF_RETURN_IF_ERROR(EnsureInitialized());
while (true) { while (true) {
tstring record; tstring record;
@ -108,20 +108,20 @@ Status FileJournalReader::Read(Update* update, bool* end_of_journal) {
if (errors::IsNotFound(env_->FileExists(next_journal_file))) { if (errors::IsNotFound(env_->FileExists(next_journal_file))) {
VLOG(3) << "Next journal file " << next_journal_file VLOG(3) << "Next journal file " << next_journal_file
<< " does not exist. End of journal reached."; << " does not exist. End of journal reached.";
*end_of_journal = true; end_of_journal = true;
return Status::OK(); return Status::OK();
} }
TF_RETURN_IF_ERROR(UpdateFile(next_journal_file)); TF_RETURN_IF_ERROR(UpdateFile(next_journal_file));
continue; continue;
} }
TF_RETURN_IF_ERROR(s); TF_RETURN_IF_ERROR(s);
if (!update->ParseFromString(record)) { if (!update.ParseFromString(record)) {
return errors::DataLoss("Failed to parse journal record."); return errors::DataLoss("Failed to parse journal record.");
} }
if (VLOG_IS_ON(4)) { if (VLOG_IS_ON(4)) {
VLOG(4) << "Read journal entry: " << update->DebugString(); VLOG(4) << "Read journal entry: " << update.DebugString();
} }
*end_of_journal = false; end_of_journal = false;
return Status::OK(); return Status::OK();
} }
} }

View File

@ -77,9 +77,9 @@ class FileJournalWriter : public JournalWriter {
class JournalReader { class JournalReader {
public: public:
virtual ~JournalReader() = default; virtual ~JournalReader() = default;
// Reads the next update from the journal. Sets `*end_of_journal=true` if // Reads the next update from the journal. Sets `end_of_journal=true` if
// there are no more updates left in the journal. // there are no more updates left in the journal.
virtual Status Read(Update* update, bool* end_of_journal) = 0; virtual Status Read(Update& update, bool& end_of_journal) = 0;
}; };
// JournalReader is not thread-safe, requiring external synchronization when // JournalReader is not thread-safe, requiring external synchronization when
@ -93,7 +93,7 @@ class FileJournalReader : public JournalReader {
FileJournalReader(const FileJournalReader&) = delete; FileJournalReader(const FileJournalReader&) = delete;
FileJournalReader& operator=(const FileJournalReader&) = delete; FileJournalReader& operator=(const FileJournalReader&) = delete;
Status Read(Update* update, bool* end_of_journal) override; Status Read(Update& update, bool& end_of_journal) override;
private: private:
// Initializes the reader if it is not yet initialized. // Initializes the reader if it is not yet initialized.

View File

@ -28,12 +28,12 @@ namespace data {
namespace { namespace {
using ::testing::HasSubstr; using ::testing::HasSubstr;
bool NewJournalDir(std::string* journal_dir) { bool NewJournalDir(std::string& journal_dir) {
std::string filename = testing::TmpDir(); std::string filename = testing::TmpDir();
if (!Env::Default()->CreateUniqueFileName(&filename, "journal_dir")) { if (!Env::Default()->CreateUniqueFileName(&filename, "journal_dir")) {
return false; return false;
} }
*journal_dir = filename; journal_dir = filename;
return true; return true;
} }
@ -67,7 +67,7 @@ Status CheckJournalContent(StringPiece journal_dir,
for (const auto& update : expected) { for (const auto& update : expected) {
Update result; Update result;
bool end_of_journal = true; bool end_of_journal = true;
TF_RETURN_IF_ERROR(reader.Read(&result, &end_of_journal)); TF_RETURN_IF_ERROR(reader.Read(result, end_of_journal));
EXPECT_FALSE(end_of_journal); EXPECT_FALSE(end_of_journal);
// We can't use the testing::EqualsProto matcher because it is not available // We can't use the testing::EqualsProto matcher because it is not available
// in OSS. // in OSS.
@ -75,7 +75,7 @@ Status CheckJournalContent(StringPiece journal_dir,
} }
Update result; Update result;
bool end_of_journal = false; bool end_of_journal = false;
TF_RETURN_IF_ERROR(reader.Read(&result, &end_of_journal)); TF_RETURN_IF_ERROR(reader.Read(result, end_of_journal));
EXPECT_TRUE(end_of_journal); EXPECT_TRUE(end_of_journal);
return Status::OK(); return Status::OK();
} }
@ -83,7 +83,7 @@ Status CheckJournalContent(StringPiece journal_dir,
TEST(Journal, RoundTripMultiple) { TEST(Journal, RoundTripMultiple) {
std::string journal_dir; std::string journal_dir;
EXPECT_TRUE(NewJournalDir(&journal_dir)); EXPECT_TRUE(NewJournalDir(journal_dir));
std::vector<Update> updates = {MakeCreateJobUpdate(), std::vector<Update> updates = {MakeCreateJobUpdate(),
MakeRegisterDatasetUpdate(), MakeRegisterDatasetUpdate(),
MakeFinishTaskUpdate()}; MakeFinishTaskUpdate()};
@ -97,7 +97,7 @@ TEST(Journal, RoundTripMultiple) {
TEST(Journal, AppendExistingJournal) { TEST(Journal, AppendExistingJournal) {
std::string journal_dir; std::string journal_dir;
EXPECT_TRUE(NewJournalDir(&journal_dir)); EXPECT_TRUE(NewJournalDir(journal_dir));
std::vector<Update> updates = {MakeCreateJobUpdate(), std::vector<Update> updates = {MakeCreateJobUpdate(),
MakeRegisterDatasetUpdate(), MakeRegisterDatasetUpdate(),
MakeFinishTaskUpdate()}; MakeFinishTaskUpdate()};
@ -111,17 +111,17 @@ TEST(Journal, AppendExistingJournal) {
TEST(Journal, MissingFile) { TEST(Journal, MissingFile) {
std::string journal_dir; std::string journal_dir;
EXPECT_TRUE(NewJournalDir(&journal_dir)); EXPECT_TRUE(NewJournalDir(journal_dir));
FileJournalReader reader(Env::Default(), journal_dir); FileJournalReader reader(Env::Default(), journal_dir);
Update result; Update result;
bool end_of_journal = true; bool end_of_journal = true;
Status s = reader.Read(&result, &end_of_journal); Status s = reader.Read(result, end_of_journal);
EXPECT_TRUE(errors::IsNotFound(s)); EXPECT_TRUE(errors::IsNotFound(s));
} }
TEST(Journal, NonRecordData) { TEST(Journal, NonRecordData) {
std::string journal_dir; std::string journal_dir;
EXPECT_TRUE(NewJournalDir(&journal_dir)); EXPECT_TRUE(NewJournalDir(journal_dir));
TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir)); TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir));
{ {
@ -134,14 +134,14 @@ TEST(Journal, NonRecordData) {
FileJournalReader reader(Env::Default(), journal_dir); FileJournalReader reader(Env::Default(), journal_dir);
Update result; Update result;
bool end_of_journal = true; bool end_of_journal = true;
Status s = reader.Read(&result, &end_of_journal); Status s = reader.Read(result, end_of_journal);
EXPECT_THAT(s.error_message(), HasSubstr("corrupted record")); EXPECT_THAT(s.error_message(), HasSubstr("corrupted record"));
EXPECT_EQ(s.code(), error::DATA_LOSS); EXPECT_EQ(s.code(), error::DATA_LOSS);
} }
TEST(Journal, InvalidRecordData) { TEST(Journal, InvalidRecordData) {
std::string journal_dir; std::string journal_dir;
EXPECT_TRUE(NewJournalDir(&journal_dir)); EXPECT_TRUE(NewJournalDir(journal_dir));
TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir)); TF_ASSERT_OK(Env::Default()->RecursivelyCreateDir(journal_dir));
{ {
@ -155,7 +155,7 @@ TEST(Journal, InvalidRecordData) {
FileJournalReader reader(Env::Default(), journal_dir); FileJournalReader reader(Env::Default(), journal_dir);
Update result; Update result;
bool end_of_journal = true; bool end_of_journal = true;
Status s = reader.Read(&result, &end_of_journal); Status s = reader.Read(result, end_of_journal);
EXPECT_THAT(s.error_message(), HasSubstr("Failed to parse journal record")); EXPECT_THAT(s.error_message(), HasSubstr("Failed to parse journal record"));
EXPECT_EQ(s.code(), error::DATA_LOSS); EXPECT_EQ(s.code(), error::DATA_LOSS);
} }