Merge pull request #37962 from zhaozheng09:bugfix/hdfs_write_max_limit
PiperOrigin-RevId: 304002135 Change-Id: Ifd15a8768a37d6525d72b604036a16aad69b8bbf
This commit is contained in:
commit
756a706221
tensorflow/core/platform/hadoop
@ -306,9 +306,24 @@ class HDFSWritableFile : public WritableFile {
|
||||
}
|
||||
|
||||
Status Append(StringPiece data) override {
|
||||
if (libhdfs()->hdfsWrite(fs_, file_, data.data(),
|
||||
static_cast<tSize>(data.size())) == -1) {
|
||||
return IOError(filename_, errno);
|
||||
size_t cur_pos = 0, write_len = 0;
|
||||
bool retry = false;
|
||||
// max() - 2 can avoid OutOfMemoryError in JVM .
|
||||
static const size_t max_len_once =
|
||||
static_cast<size_t>(std::numeric_limits<tSize>::max() - 2);
|
||||
while (cur_pos < data.size()) {
|
||||
write_len = std::min(data.size() - cur_pos, max_len_once);
|
||||
tSize w = libhdfs()->hdfsWrite(fs_, file_, data.data() + cur_pos,
|
||||
static_cast<tSize>(write_len));
|
||||
if (w == -1) {
|
||||
if (!retry && (errno == EINTR || errno == EAGAIN)) {
|
||||
retry = true;
|
||||
} else {
|
||||
return IOError(filename_, errno);
|
||||
}
|
||||
} else {
|
||||
cur_pos += w;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ limitations under the License.
|
||||
#include "tensorflow/core/platform/path.h"
|
||||
#include "tensorflow/core/platform/str_util.h"
|
||||
#include "tensorflow/core/platform/test.h"
|
||||
#include "third_party/hadoop/hdfs.h"
|
||||
|
||||
namespace tensorflow {
|
||||
namespace {
|
||||
@ -273,6 +274,23 @@ TEST_F(HadoopFileSystemTest, HarRootPath) {
|
||||
EXPECT_EQ("har://hdfs-root/user/j.doe/my_archive.har", nn);
|
||||
EXPECT_EQ("/", path);
|
||||
}
|
||||
|
||||
TEST_F(HadoopFileSystemTest, WriteBigFile) {
|
||||
const string fname = TmpDir("BigFile");
|
||||
const size_t file_len =
|
||||
static_cast<size_t>(std::numeric_limits<tSize>::max()) + 1024;
|
||||
// Fake a test string .
|
||||
char* p = new char[file_len];
|
||||
for (size_t i = 0; i < file_len; ++i) {
|
||||
*(p + i) = (i % 128);
|
||||
}
|
||||
string file_write_content(p, file_len);
|
||||
TF_ASSERT_OK(WriteString(fname, file_write_content));
|
||||
string file_read_content;
|
||||
TF_EXPECT_OK(ReadAll(fname, &file_read_content));
|
||||
EXPECT_EQ(file_write_content, file_read_content);
|
||||
delete p;
|
||||
}
|
||||
// NewAppendableFile() is not testable. Local filesystem maps to
|
||||
// ChecksumFileSystem in Hadoop, where appending is an unsupported operation.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user