HDFS support

Notes:
- The test is tagged as manual, and you must download the Hadoop distribution to run it.
- We ask during ./configure whether to include HDFS support.
- Copied hdfs.h from Hadoop here in third_party. It's licensed Apache 2.0.
Change: 133615494
This commit is contained in:
Jonathan Hseu 2016-09-19 11:14:58 -08:00 committed by TensorFlower Gardener
parent 7efb6906a1
commit 9f5b098f77
15 changed files with 1918 additions and 146 deletions

24
configure vendored
View File

@ -1,7 +1,5 @@
#!/usr/bin/env bash
DO_NOT_SUBMIT_WARNING="Unofficial setting. DO NOT SUBMIT!!!"
# Find out the absolute path to where ./configure resides
pushd `dirname $0` #> /dev/null
SOURCE_BASE_DIR=`pwd -P`
@ -60,6 +58,28 @@ else
perl -pi -e "s,WITH_GCP_SUPPORT = (False|True),WITH_GCP_SUPPORT = False,s" tensorflow/core/platform/default/build_config.bzl
fi
while [ "$TF_NEED_HDFS" == "" ]; do
read -p "Do you wish to build TensorFlow with "\
"Hadoop File System support? [y/N] " INPUT
case $INPUT in
[Yy]* ) echo "Hadoop File System support will be enabled for "\
"TensorFlow"; TF_NEED_HDFS=1;;
[Nn]* ) echo "No Hadoop File System support will be enabled for "\
"TensorFlow"; TF_NEED_HDFS=0;;
"" ) echo "No Hadoop File System support will be enabled for "\
"TensorFlow"; TF_NEED_HDFS=0;;
* ) echo "Invalid selection: " $INPUT;;
esac
done
if [ "$TF_NEED_HDFS" == "1" ]; then
# Update Bazel build configuration.
perl -pi -e "s,WITH_HDFS_SUPPORT = (False|True),WITH_HDFS_SUPPORT = True,s" tensorflow/core/platform/default/build_config.bzl
else
# Update Bazel build configuration.
perl -pi -e "s,WITH_HDFS_SUPPORT = (False|True),WITH_HDFS_SUPPORT = False,s" tensorflow/core/platform/default/build_config.bzl
fi
## Find swig path
if [ -z "$SWIG_PATH" ]; then
SWIG_PATH=`type -p swig 2> /dev/null`

View File

@ -131,6 +131,7 @@ filegroup(
"//tensorflow/core/ops/compat:all_files",
"//tensorflow/core/platform/cloud:all_files",
"//tensorflow/core/platform/default/build_config:all_files",
"//tensorflow/core/platform/hadoop:all_files",
"//tensorflow/core/util/ctc:all_files",
"//tensorflow/examples/android:all_files",
"//tensorflow/examples/how_tos/reading_data:all_files",
@ -166,6 +167,7 @@ filegroup(
"//tensorflow/tensorboard/lib:all_files",
"//tensorflow/tensorboard/lib/python:all_files",
"//tensorflow/tensorboard/scripts:all_files",
"//tensorflow/third_party/hadoop:all_files",
"//tensorflow/tools/dist_test/server:all_files",
"//tensorflow/tools/docker:all_files",
"//tensorflow/tools/docker/notebooks:all_files",

View File

@ -5,6 +5,7 @@ tensorflow/core/platform/tensor_coding.cc
tensorflow/core/platform/protobuf_util.cc
tensorflow/core/platform/posix/posix_file_system.cc
tensorflow/core/platform/posix/port.cc
tensorflow/core/platform/posix/error.cc
tensorflow/core/platform/posix/env.cc
tensorflow/core/platform/load_library.cc
tensorflow/core/platform/file_system.cc

View File

@ -630,6 +630,7 @@ filegroup(
"platform/default/test_benchmark.*",
"platform/cuda.h",
"platform/google/**/*",
"platform/hadoop/**/*",
"platform/jpeg.*",
"platform/png.*",
"platform/gif.*",

View File

@ -3,8 +3,9 @@
load("@protobuf//:protobuf.bzl", "cc_proto_library")
load("@protobuf//:protobuf.bzl", "py_proto_library")
# configure may change the following line to True
# configure may change the following lines to True
WITH_GCP_SUPPORT = False
WITH_HDFS_SUPPORT = False
# Appends a suffix to a list of deps.
def tf_deps(deps, suffix):
@ -123,5 +124,9 @@ def tf_kernel_tests_linkstatic():
return 0
def tf_additional_lib_deps():
return (["//tensorflow/core/platform/cloud:gcs_file_system"]
if WITH_GCP_SUPPORT else [])
deps = []
if WITH_GCP_SUPPORT:
deps.append("//tensorflow/core/platform/cloud:gcs_file_system")
if WITH_HDFS_SUPPORT:
deps.append("//tensorflow/core/platform/hadoop:hadoop_file_system")
return deps

View File

@ -0,0 +1,69 @@
# Description:
# Hadoop file system implementation.
package(
default_visibility = ["//visibility:public"],
)
licenses(["notice"]) # Apache 2.0
load(
"//tensorflow:tensorflow.bzl",
"tf_cc_test",
)
filegroup(
name = "all_files",
srcs = glob(
["**/*"],
exclude = [
"**/METADATA",
"**/OWNERS",
],
),
visibility = ["//tensorflow:__subpackages__"],
)
cc_library(
name = "hadoop_file_system",
srcs = ["hadoop_file_system.cc"],
hdrs = ["hadoop_file_system.h"],
deps = [
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/third_party/hadoop:hdfs",
],
alwayslink = 1,
)
# This test is set to manual because it requires downloading the Hadoop
# distribution to run. To run this test:
# 1. Ensure $JAVA_HOME is set.
# 2. Download the binary Hadoop distribution from:
# http://hadoop.apache.org/releases.html
# 3. Extract the Hadoop distribution and run:
# source libexec/hadoop-config.sh
# 4. bazel test \
# --test_env=LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server \
# --test_env=HADOOP_HDFS_HOME=$HADOOP_HDFS_HOME \
# --test_env=CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) \
# --test_strategy=local \
# :hadoop_file_system_test
tf_cc_test(
name = "hadoop_file_system_test",
size = "small",
srcs = [
"hadoop_file_system_test.cc",
],
tags = [
"manual",
"notap",
],
deps = [
":hadoop_file_system",
"//tensorflow/core:lib",
"//tensorflow/core:lib_internal",
"//tensorflow/core:test",
"//tensorflow/core:test_main",
],
)

View File

@ -0,0 +1,431 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
#include <errno.h>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/posix/error.h"
#include "tensorflow/third_party/hadoop/hdfs.h"
namespace tensorflow {
template <typename R, typename... Args>
Status BindFunc(void* handle, const char* name,
std::function<R(Args...)>* func) {
void* symbol_ptr = nullptr;
TF_RETURN_IF_ERROR(
Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr));
*func = reinterpret_cast<R (*)(Args...)>(symbol_ptr);
return Status::OK();
}
class LibHDFS {
public:
static LibHDFS* Load() {
static LibHDFS* lib = []() -> LibHDFS* {
LibHDFS* lib = new LibHDFS;
lib->LoadAndBind();
return lib;
}();
return lib;
}
// The status, if any, from failure to load.
Status status() { return status_; }
std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect;
std::function<hdfsBuilder*()> hdfsNewBuilder;
std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode;
std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile;
std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread;
std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite;
std::function<int(hdfsFS, hdfsFile)> hdfsFlush;
std::function<int(hdfsFS, hdfsFile)> hdfsHSync;
std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)>
hdfsOpenFile;
std::function<int(hdfsFS, const char*)> hdfsExists;
std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory;
std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo;
std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete;
std::function<int(hdfsFS, const char*)> hdfsCreateDirectory;
std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo;
std::function<int(hdfsFS, const char*, const char*)> hdfsRename;
private:
void LoadAndBind() {
auto TryLoadAndBind = [this](const char* name, void** handle) -> Status {
TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle));
#define BIND_HDFS_FUNC(function) \
TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function));
BIND_HDFS_FUNC(hdfsBuilderConnect);
BIND_HDFS_FUNC(hdfsNewBuilder);
BIND_HDFS_FUNC(hdfsBuilderSetNameNode);
BIND_HDFS_FUNC(hdfsCloseFile);
BIND_HDFS_FUNC(hdfsPread);
BIND_HDFS_FUNC(hdfsWrite);
BIND_HDFS_FUNC(hdfsFlush);
BIND_HDFS_FUNC(hdfsHSync);
BIND_HDFS_FUNC(hdfsOpenFile);
BIND_HDFS_FUNC(hdfsExists);
BIND_HDFS_FUNC(hdfsListDirectory);
BIND_HDFS_FUNC(hdfsFreeFileInfo);
BIND_HDFS_FUNC(hdfsDelete);
BIND_HDFS_FUNC(hdfsCreateDirectory);
BIND_HDFS_FUNC(hdfsGetPathInfo);
BIND_HDFS_FUNC(hdfsRename);
#undef BIND_HDFS_FUNC
return Status::OK();
};
// libhdfs.so won't be in the standard locations. Use the path as specified
// in the libhdfs documentation.
char* hdfs_home = getenv("HADOOP_HDFS_HOME");
if (hdfs_home == nullptr) {
status_ = errors::FailedPrecondition(
"Environment variable HADOOP_HDFS_HOME not set");
return;
}
string path = io::JoinPath(hdfs_home, "lib", "native", "libhdfs.so");
status_ = TryLoadAndBind(path.c_str(), &handle_);
return;
}
Status status_;
void* handle_ = nullptr;
};
HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {}
HadoopFileSystem::~HadoopFileSystem() {}
// We rely on HDFS connection caching here. The HDFS client calls
// org.apache.hadoop.fs.FileSystem.get(), which caches the connection
// internally.
Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) {
TF_RETURN_IF_ERROR(hdfs_->status());
if (!fname.Consume("hdfs://")) {
return errors::InvalidArgument("HDFS path must start with hdfs://");
}
auto first_slash = fname.find('/');
string namenode;
if (first_slash == string::npos) {
namenode = fname.ToString();
} else {
namenode = fname.substr(0, first_slash).ToString();
}
hdfsBuilder* builder = hdfs_->hdfsNewBuilder();
if (namenode == "localfilesystem") {
hdfs_->hdfsBuilderSetNameNode(builder, nullptr);
} else {
hdfs_->hdfsBuilderSetNameNode(builder, namenode.c_str());
}
*fs = hdfs_->hdfsBuilderConnect(builder);
if (*fs == nullptr) {
return errors::NotFound(strerror(errno));
}
return Status::OK();
}
string HadoopFileSystem::TranslateName(const string& name) const {
StringPiece sp = name;
sp.Consume("hdfs://");
auto first_slash = sp.find('/');
if (first_slash == string::npos) {
return string();
}
sp.remove_prefix(first_slash);
return sp.ToString();
}
class HDFSRandomAccessFile : public RandomAccessFile {
public:
HDFSRandomAccessFile(const string& fname, LibHDFS* hdfs, hdfsFS fs,
hdfsFile file)
: filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
~HDFSRandomAccessFile() override { hdfs_->hdfsCloseFile(fs_, file_); }
Status Read(uint64 offset, size_t n, StringPiece* result,
char* scratch) const override {
Status s;
char* dst = scratch;
while (n > 0 && s.ok()) {
tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst,
static_cast<tSize>(n));
if (r > 0) {
dst += r;
n -= r;
offset += r;
} else if (r == 0) {
s = Status(error::OUT_OF_RANGE, "Read less bytes than requested");
} else if (errno == EINTR || errno == EAGAIN) {
// hdfsPread may return EINTR too. Just retry.
} else {
s = IOError(filename_, errno);
}
}
*result = StringPiece(scratch, dst - scratch);
return s;
}
private:
string filename_;
LibHDFS* hdfs_;
hdfsFS fs_;
hdfsFile file_;
};
Status HadoopFileSystem::NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
hdfsFile file =
hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0);
if (file == nullptr) {
return IOError(fname, errno);
}
result->reset(new HDFSRandomAccessFile(fname, hdfs_, fs, file));
return Status::OK();
}
class HDFSWritableFile : public WritableFile {
public:
HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
: filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
~HDFSWritableFile() override {
if (file_ != nullptr) {
Close();
}
}
Status Append(const StringPiece& data) override {
if (hdfs_->hdfsWrite(fs_, file_, data.data(),
static_cast<tSize>(data.size())) == -1) {
return IOError(filename_, errno);
}
return Status::OK();
}
Status Close() override {
Status result;
if (hdfs_->hdfsCloseFile(fs_, file_) != 0) {
result = IOError(filename_, errno);
}
hdfs_ = nullptr;
fs_ = nullptr;
file_ = nullptr;
return result;
}
Status Flush() override {
if (hdfs_->hdfsFlush(fs_, file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
Status Sync() override {
if (hdfs_->hdfsHSync(fs_, file_) != 0) {
return IOError(filename_, errno);
}
return Status::OK();
}
private:
string filename_;
LibHDFS* hdfs_;
hdfsFS fs_;
hdfsFile file_;
};
Status HadoopFileSystem::NewWritableFile(
const string& fname, std::unique_ptr<WritableFile>* result) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
hdfsFile file =
hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0);
if (file == nullptr) {
return IOError(fname, errno);
}
result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
return Status::OK();
}
Status HadoopFileSystem::NewAppendableFile(
const string& fname, std::unique_ptr<WritableFile>* result) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(),
O_WRONLY | O_APPEND, 0, 0, 0);
if (file == nullptr) {
return IOError(fname, errno);
}
result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
return Status::OK();
}
Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile(
const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
// hadoopReadZero() technically supports this call with the following
// caveats:
// - It only works up to 2 GB. We'd have to Stat() the file to ensure that
// it fits.
// - If not on the local filesystem, the entire file will be read, making
// it inefficient for callers that assume typical mmap() behavior.
return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion");
}
bool HadoopFileSystem::FileExists(const string& fname) {
hdfsFS fs = nullptr;
Status status = Connect(fname, &fs);
if (!status.ok()) {
LOG(ERROR) << "Connect failed: " << status.error_message();
return false;
}
return hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0;
}
Status HadoopFileSystem::GetChildren(const string& dir,
std::vector<string>* result) {
result->clear();
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(dir, &fs));
// hdfsListDirectory returns nullptr if the directory is empty. Do a separate
// check to verify the directory exists first.
FileStatistics stat;
TF_RETURN_IF_ERROR(Stat(dir, &stat));
int entries = 0;
hdfsFileInfo* info =
hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
if (info == nullptr) {
if (stat.is_directory) {
// Assume it's an empty directory.
return Status::OK();
}
return IOError(dir, errno);
}
for (int i = 0; i < entries; i++) {
result->push_back(io::Basename(info[i].mName).ToString());
}
hdfs_->hdfsFreeFileInfo(info, entries);
return Status::OK();
}
Status HadoopFileSystem::DeleteFile(const string& fname) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(),
/*recursive=*/0) != 0) {
return IOError(fname, errno);
}
return Status::OK();
}
Status HadoopFileSystem::CreateDir(const string& dir) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(dir, &fs));
if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) {
return IOError(dir, errno);
}
return Status::OK();
}
Status HadoopFileSystem::DeleteDir(const string& dir) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(dir, &fs));
// Count the number of entries in the directory, and only delete if it's
// non-empty. This is consistent with the interface, but note that there's
// a race condition where a file may be added after this check, in which
// case the directory will still be deleted.
int entries = 0;
hdfsFileInfo* info =
hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
if (info != nullptr) {
return IOError(dir, errno);
}
hdfs_->hdfsFreeFileInfo(info, entries);
if (entries > 0) {
return errors::FailedPrecondition("Cannot delete a non-empty directory.");
}
if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(),
/*recursive=*/1) != 0) {
return IOError(dir, errno);
}
return Status::OK();
}
Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
if (info == nullptr) {
return IOError(fname, errno);
}
*size = static_cast<uint64>(info->mSize);
hdfs_->hdfsFreeFileInfo(info, 1);
return Status::OK();
}
Status HadoopFileSystem::RenameFile(const string& src, const string& target) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(src, &fs));
if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(),
TranslateName(target).c_str()) != 0) {
return IOError(src, errno);
}
return Status::OK();
}
Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) {
hdfsFS fs = nullptr;
TF_RETURN_IF_ERROR(Connect(fname, &fs));
hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
if (info == nullptr) {
return IOError(fname, errno);
}
stats->length = static_cast<int64>(info->mSize);
stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9;
stats->is_directory = info->mKind == kObjectKindDirectory;
hdfs_->hdfsFreeFileInfo(info, 1);
return Status::OK();
}
REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem);
} // namespace tensorflow

View File

@ -0,0 +1,73 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_
#define THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_
#include "tensorflow/core/platform/env.h"
extern "C" {
struct hdfs_internal;
typedef hdfs_internal* hdfsFS;
}
namespace tensorflow {
class LibHDFS;
class HadoopFileSystem : public FileSystem {
public:
HadoopFileSystem();
~HadoopFileSystem();
Status NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) override;
Status NewWritableFile(const string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewAppendableFile(const string& fname,
std::unique_ptr<WritableFile>* result) override;
Status NewReadOnlyMemoryRegionFromFile(
const string& fname,
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
bool FileExists(const string& fname) override;
Status GetChildren(const string& dir, std::vector<string>* result) override;
Status DeleteFile(const string& fname) override;
Status CreateDir(const string& name) override;
Status DeleteDir(const string& name) override;
Status GetFileSize(const string& fname, uint64* size) override;
Status RenameFile(const string& src, const string& target) override;
Status Stat(const string& fname, FileStatistics* stat) override;
string TranslateName(const string& name) const override;
private:
Status Connect(StringPiece fname, hdfsFS* fs);
LibHDFS* hdfs_;
};
} // namespace tensorflow
#endif // THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_

View File

@ -0,0 +1,186 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
#include "tensorflow/core/lib/core/status_test_util.h"
#include "tensorflow/core/lib/gtl/stl_util.h"
#include "tensorflow/core/lib/io/path.h"
#include "tensorflow/core/platform/file_system.h"
#include "tensorflow/core/platform/test.h"
namespace tensorflow {
namespace {
class HadoopFileSystemTest : public ::testing::Test {
protected:
HadoopFileSystemTest() {}
Status WriteString(const string& fname, const string& content) {
std::unique_ptr<WritableFile> writer;
TF_RETURN_IF_ERROR(hdfs.NewWritableFile(fname, &writer));
TF_RETURN_IF_ERROR(writer->Append(content));
TF_RETURN_IF_ERROR(writer->Close());
return Status::OK();
}
Status ReadAll(const string& fname, string* content) {
std::unique_ptr<RandomAccessFile> reader;
TF_RETURN_IF_ERROR(hdfs.NewRandomAccessFile(fname, &reader));
uint64 file_size = 0;
TF_RETURN_IF_ERROR(hdfs.GetFileSize(fname, &file_size));
content->resize(file_size);
StringPiece result;
TF_RETURN_IF_ERROR(
reader->Read(0, file_size, &result, gtl::string_as_array(content)));
if (file_size != result.size()) {
return errors::DataLoss("expected ", file_size, " got ", result.size(),
" bytes");
}
return Status::OK();
}
HadoopFileSystem hdfs;
};
TEST_F(HadoopFileSystemTest, RandomAccessFile) {
const string fname = io::JoinPath("hdfs://localfilesystem", testing::TmpDir(),
"RandomAccessFile");
const string content = "abcdefghijklmn";
TF_ASSERT_OK(WriteString(fname, content));
std::unique_ptr<RandomAccessFile> reader;
TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader));
string got;
got.resize(content.size());
StringPiece result;
TF_EXPECT_OK(
reader->Read(0, content.size(), &result, gtl::string_as_array(&got)));
EXPECT_EQ(content.size(), result.size());
EXPECT_EQ(content, result);
got.clear();
got.resize(4);
TF_EXPECT_OK(reader->Read(2, 4, &result, gtl::string_as_array(&got)));
EXPECT_EQ(4, result.size());
EXPECT_EQ(content.substr(2, 4), result);
}
TEST_F(HadoopFileSystemTest, WritableFile) {
std::unique_ptr<WritableFile> writer;
const string fname =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "WritableFile");
TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer));
TF_EXPECT_OK(writer->Append("content1,"));
TF_EXPECT_OK(writer->Append("content2"));
TF_EXPECT_OK(writer->Flush());
TF_EXPECT_OK(writer->Sync());
TF_EXPECT_OK(writer->Close());
string content;
TF_EXPECT_OK(ReadAll(fname, &content));
EXPECT_EQ("content1,content2", content);
}
TEST_F(HadoopFileSystemTest, FileExists) {
const string fname =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "FileExists");
EXPECT_FALSE(hdfs.FileExists(fname));
TF_ASSERT_OK(WriteString(fname, "test"));
EXPECT_TRUE(hdfs.FileExists(fname));
}
TEST_F(HadoopFileSystemTest, GetChildren) {
const string base =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "GetChildren");
TF_EXPECT_OK(hdfs.CreateDir(base));
const string file = io::JoinPath(base, "testfile.csv");
TF_EXPECT_OK(WriteString(file, "blah"));
const string subdir = io::JoinPath(base, "subdir");
TF_EXPECT_OK(hdfs.CreateDir(subdir));
vector<string> children;
TF_EXPECT_OK(hdfs.GetChildren(base, &children));
std::sort(children.begin(), children.end());
EXPECT_EQ(vector<string>({"subdir", "testfile.csv"}), children);
}
TEST_F(HadoopFileSystemTest, DeleteFile) {
const string fname =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "DeleteFile");
EXPECT_FALSE(hdfs.DeleteFile(fname).ok());
TF_ASSERT_OK(WriteString(fname, "test"));
TF_EXPECT_OK(hdfs.DeleteFile(fname));
}
TEST_F(HadoopFileSystemTest, GetFileSize) {
const string fname =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "GetFileSize");
TF_ASSERT_OK(WriteString(fname, "test"));
uint64 file_size = 0;
TF_EXPECT_OK(hdfs.GetFileSize(fname, &file_size));
EXPECT_EQ(4, file_size);
}
TEST_F(HadoopFileSystemTest, CreateDirStat) {
const string dir = io::JoinPath("hdfs://localfilesystem", testing::TmpDir(),
"CreateDirStat");
TF_EXPECT_OK(hdfs.CreateDir(dir));
FileStatistics stat;
TF_EXPECT_OK(hdfs.Stat(dir, &stat));
EXPECT_TRUE(stat.is_directory);
}
TEST_F(HadoopFileSystemTest, DeleteDir) {
const string dir =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "DeleteDir");
EXPECT_FALSE(hdfs.DeleteDir(dir).ok());
TF_EXPECT_OK(hdfs.CreateDir(dir));
TF_EXPECT_OK(hdfs.DeleteDir(dir));
FileStatistics stat;
EXPECT_FALSE(hdfs.Stat(dir, &stat).ok());
}
TEST_F(HadoopFileSystemTest, RenameFile) {
const string fname1 =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "RenameFile1");
const string fname2 =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "RenameFile2");
TF_ASSERT_OK(WriteString(fname1, "test"));
TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2));
string content;
TF_EXPECT_OK(ReadAll(fname2, &content));
EXPECT_EQ("test", content);
}
TEST_F(HadoopFileSystemTest, StatFile) {
const string fname =
io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "StatFile");
TF_ASSERT_OK(WriteString(fname, "test"));
FileStatistics stat;
TF_EXPECT_OK(hdfs.Stat(fname, &stat));
EXPECT_EQ(4, stat.length);
EXPECT_FALSE(stat.is_directory);
}
// NewAppendableFile() is not testable. Local filesystem maps to
// ChecksumFileSystem in Hadoop, where appending is an unsupported operation.
} // namespace
} // namespace tensorflow

View File

@ -0,0 +1,163 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/core/platform/posix/error.h"
#include <errno.h>
#include <string.h>
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/strings/strcat.h"
namespace tensorflow {
error::Code ErrnoToCode(int err_number) {
error::Code code;
switch (err_number) {
case 0:
code = error::OK;
break;
case EINVAL: // Invalid argument
case ENAMETOOLONG: // Filename too long
case E2BIG: // Argument list too long
case EDESTADDRREQ: // Destination address required
case EDOM: // Mathematics argument out of domain of function
case EFAULT: // Bad address
case EILSEQ: // Illegal byte sequence
case ENOPROTOOPT: // Protocol not available
case ENOSTR: // Not a STREAM
case ENOTSOCK: // Not a socket
case ENOTTY: // Inappropriate I/O control operation
case EPROTOTYPE: // Protocol wrong type for socket
case ESPIPE: // Invalid seek
code = error::INVALID_ARGUMENT;
break;
case ETIMEDOUT: // Connection timed out
case ETIME: // Timer expired
code = error::DEADLINE_EXCEEDED;
break;
case ENODEV: // No such device
case ENOENT: // No such file or directory
case ENXIO: // No such device or address
case ESRCH: // No such process
code = error::NOT_FOUND;
break;
case EEXIST: // File exists
case EADDRNOTAVAIL: // Address not available
case EALREADY: // Connection already in progress
code = error::ALREADY_EXISTS;
break;
case EPERM: // Operation not permitted
case EACCES: // Permission denied
case EROFS: // Read only file system
code = error::PERMISSION_DENIED;
break;
case ENOTEMPTY: // Directory not empty
case EISDIR: // Is a directory
case ENOTDIR: // Not a directory
case EADDRINUSE: // Address already in use
case EBADF: // Invalid file descriptor
case EBUSY: // Device or resource busy
case ECHILD: // No child processes
case EISCONN: // Socket is connected
case ENOTBLK: // Block device required
case ENOTCONN: // The socket is not connected
case EPIPE: // Broken pipe
case ESHUTDOWN: // Cannot send after transport endpoint shutdown
case ETXTBSY: // Text file busy
code = error::FAILED_PRECONDITION;
break;
case ENOSPC: // No space left on device
case EDQUOT: // Disk quota exceeded
case EMFILE: // Too many open files
case EMLINK: // Too many links
case ENFILE: // Too many open files in system
case ENOBUFS: // No buffer space available
case ENODATA: // No message is available on the STREAM read queue
case ENOMEM: // Not enough space
case ENOSR: // No STREAM resources
case EUSERS: // Too many users
code = error::RESOURCE_EXHAUSTED;
break;
case EFBIG: // File too large
case EOVERFLOW: // Value too large to be stored in data type
case ERANGE: // Result too large
code = error::OUT_OF_RANGE;
break;
case ENOSYS: // Function not implemented
case ENOTSUP: // Operation not supported
case EAFNOSUPPORT: // Address family not supported
case EPFNOSUPPORT: // Protocol family not supported
case EPROTONOSUPPORT: // Protocol not supported
case ESOCKTNOSUPPORT: // Socket type not supported
case EXDEV: // Improper link
code = error::UNIMPLEMENTED;
break;
case EAGAIN: // Resource temporarily unavailable
case ECONNREFUSED: // Connection refused
case ECONNABORTED: // Connection aborted
case ECONNRESET: // Connection reset
case EINTR: // Interrupted function call
case EHOSTDOWN: // Host is down
case EHOSTUNREACH: // Host is unreachable
case ENETDOWN: // Network is down
case ENETRESET: // Connection aborted by network
case ENETUNREACH: // Network unreachable
case ENOLCK: // No locks available
case ENOLINK: // Link has been severed
#if !defined(__APPLE__)
case ENONET: // Machine is not on the network
#endif
code = error::UNAVAILABLE;
break;
case EDEADLK: // Resource deadlock avoided
case ESTALE: // Stale file handle
code = error::ABORTED;
break;
case ECANCELED: // Operation cancelled
code = error::CANCELLED;
break;
// NOTE: If you get any of the following (especially in a
// reproducible way) and can propose a better mapping,
// please email the owners about updating this mapping.
case EBADMSG: // Bad message
case EIDRM: // Identifier removed
case EINPROGRESS: // Operation in progress
case EIO: // I/O error
case ELOOP: // Too many levels of symbolic links
case ENOEXEC: // Exec format error
case ENOMSG: // No message of the desired type
case EPROTO: // Protocol error
case EREMOTE: // Object is remote
code = error::UNKNOWN;
break;
default: {
code = error::UNKNOWN;
break;
}
}
return code;
}
Status IOError(const string& context, int err_number) {
auto code = ErrnoToCode(err_number);
if (code == error::UNKNOWN) {
return Status(code, strings::StrCat(context, "; ", strerror(err_number)));
} else {
return Status(code, context);
}
}
} // namespace tensorflow

View File

@ -0,0 +1,27 @@
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#ifndef TENSORFLOW_CORE_PLATFORM_POSIX_ERROR_H_
#define TENSORFLOW_CORE_PLATFORM_POSIX_ERROR_H_
#include "tensorflow/core/lib/core/status.h"
namespace tensorflow {
Status IOError(const string& context, int err_number);
} // namespace tensorflow
#endif // TENSORFLOW_CORE_PLATFORM_POSIX_POSIX_FILE_SYSTEM_H_

View File

@ -25,143 +25,15 @@ limitations under the License.
#include <unistd.h>
#include "tensorflow/core/lib/core/error_codes.pb.h"
#include "tensorflow/core/lib/core/status.h"
#include "tensorflow/core/lib/strings/strcat.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/logging.h"
#include "tensorflow/core/platform/posix/error.h"
#include "tensorflow/core/platform/posix/posix_file_system.h"
namespace tensorflow {
namespace {
error::Code ErrnoToCode(int err_number) {
error::Code code;
switch (err_number) {
case 0:
code = error::OK;
break;
case EINVAL: // Invalid argument
case ENAMETOOLONG: // Filename too long
case E2BIG: // Argument list too long
case EDESTADDRREQ: // Destination address required
case EDOM: // Mathematics argument out of domain of function
case EFAULT: // Bad address
case EILSEQ: // Illegal byte sequence
case ENOPROTOOPT: // Protocol not available
case ENOSTR: // Not a STREAM
case ENOTSOCK: // Not a socket
case ENOTTY: // Inappropriate I/O control operation
case EPROTOTYPE: // Protocol wrong type for socket
case ESPIPE: // Invalid seek
code = error::INVALID_ARGUMENT;
break;
case ETIMEDOUT: // Connection timed out
case ETIME: // Timer expired
code = error::DEADLINE_EXCEEDED;
break;
case ENODEV: // No such device
case ENOENT: // No such file or directory
case ENXIO: // No such device or address
case ESRCH: // No such process
code = error::NOT_FOUND;
break;
case EEXIST: // File exists
case EADDRNOTAVAIL: // Address not available
case EALREADY: // Connection already in progress
code = error::ALREADY_EXISTS;
break;
case EPERM: // Operation not permitted
case EACCES: // Permission denied
case EROFS: // Read only file system
code = error::PERMISSION_DENIED;
break;
case ENOTEMPTY: // Directory not empty
case EISDIR: // Is a directory
case ENOTDIR: // Not a directory
case EADDRINUSE: // Address already in use
case EBADF: // Invalid file descriptor
case EBUSY: // Device or resource busy
case ECHILD: // No child processes
case EISCONN: // Socket is connected
case ENOTBLK: // Block device required
case ENOTCONN: // The socket is not connected
case EPIPE: // Broken pipe
case ESHUTDOWN: // Cannot send after transport endpoint shutdown
case ETXTBSY: // Text file busy
code = error::FAILED_PRECONDITION;
break;
case ENOSPC: // No space left on device
case EDQUOT: // Disk quota exceeded
case EMFILE: // Too many open files
case EMLINK: // Too many links
case ENFILE: // Too many open files in system
case ENOBUFS: // No buffer space available
case ENODATA: // No message is available on the STREAM read queue
case ENOMEM: // Not enough space
case ENOSR: // No STREAM resources
case EUSERS: // Too many users
code = error::RESOURCE_EXHAUSTED;
break;
case EFBIG: // File too large
case EOVERFLOW: // Value too large to be stored in data type
case ERANGE: // Result too large
code = error::OUT_OF_RANGE;
break;
case ENOSYS: // Function not implemented
case ENOTSUP: // Operation not supported
case EAFNOSUPPORT: // Address family not supported
case EPFNOSUPPORT: // Protocol family not supported
case EPROTONOSUPPORT: // Protocol not supported
case ESOCKTNOSUPPORT: // Socket type not supported
case EXDEV: // Improper link
code = error::UNIMPLEMENTED;
break;
case EAGAIN: // Resource temporarily unavailable
case ECONNREFUSED: // Connection refused
case ECONNABORTED: // Connection aborted
case ECONNRESET: // Connection reset
case EINTR: // Interrupted function call
case EHOSTDOWN: // Host is down
case EHOSTUNREACH: // Host is unreachable
case ENETDOWN: // Network is down
case ENETRESET: // Connection aborted by network
case ENETUNREACH: // Network unreachable
case ENOLCK: // No locks available
case ENOLINK: // Link has been severed
#if !defined(__APPLE__)
case ENONET: // Machine is not on the network
#endif
code = error::UNAVAILABLE;
break;
case EDEADLK: // Resource deadlock avoided
case ESTALE: // Stale file handle
code = error::ABORTED;
break;
case ECANCELED: // Operation cancelled
code = error::CANCELLED;
break;
// NOTE: If you get any of the following (especially in a
// reproducible way) and can propose a better mapping,
// please email the owners about updating this mapping.
case EBADMSG: // Bad message
case EIDRM: // Identifier removed
case EINPROGRESS: // Operation in progress
case EIO: // I/O error
case ELOOP: // Too many levels of symbolic links
case ENOEXEC: // Exec format error
case ENOMSG: // No message of the desired type
case EPROTO: // Protocol error
case EREMOTE: // Object is remote
code = error::UNKNOWN;
break;
default: {
code = error::UNKNOWN;
break;
}
}
return code;
}
// pread() based random-access
class PosixRandomAccessFile : public RandomAccessFile {
private:
@ -258,8 +130,6 @@ class PosixReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
const uint64 length_;
};
} // namespace
Status PosixFileSystem::NewRandomAccessFile(
const string& fname, std::unique_ptr<RandomAccessFile>* result) {
string translated_fname = TranslateName(fname);
@ -401,13 +271,4 @@ Status PosixFileSystem::RenameFile(const string& src, const string& target) {
return result;
}
Status IOError(const string& context, int err_number) {
auto code = ErrnoToCode(err_number);
if (code == error::UNKNOWN) {
return Status(code, strings::StrCat(context, "; ", strerror(err_number)));
} else {
return Status(code, context);
}
}
} // namespace tensorflow

20
tensorflow/third_party/hadoop/BUILD vendored Normal file
View File

@ -0,0 +1,20 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"]) # Apache 2.0
filegroup(
name = "all_files",
srcs = glob(
["**/*"],
exclude = [
"**/METADATA",
"**/OWNERS",
],
),
visibility = ["//tensorflow:__subpackages__"],
)
cc_library(
name = "hdfs",
hdrs = ["hdfs.h"],
)

911
tensorflow/third_party/hadoop/hdfs.h vendored Normal file
View File

@ -0,0 +1,911 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFS_HDFS_H
#define LIBHDFS_HDFS_H
#include <errno.h> /* for EINTERNAL, etc. */
#include <fcntl.h> /* for O_RDONLY, O_WRONLY */
#include <stdint.h> /* for uint64_t, etc. */
#include <time.h> /* for time_t */
/*
* Support export of DLL symbols during libhdfs build, and import of DLL symbols
* during client application build. A client application may optionally define
* symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but
* the compiler can produce more efficient code with it.
*/
#ifdef WIN32
#ifdef LIBHDFS_DLL_EXPORT
#define LIBHDFS_EXTERNAL __declspec(dllexport)
#elif LIBHDFS_DLL_IMPORT
#define LIBHDFS_EXTERNAL __declspec(dllimport)
#else
#define LIBHDFS_EXTERNAL
#endif
#else
#ifdef LIBHDFS_DLL_EXPORT
#define LIBHDFS_EXTERNAL __attribute__((visibility("default")))
#elif LIBHDFS_DLL_IMPORT
#define LIBHDFS_EXTERNAL __attribute__((visibility("default")))
#else
#define LIBHDFS_EXTERNAL
#endif
#endif
#ifndef O_RDONLY
#define O_RDONLY 1
#endif
#ifndef O_WRONLY
#define O_WRONLY 2
#endif
#ifndef EINTERNAL
#define EINTERNAL 255
#endif
#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
"org/apache/hadoop/io/ElasticByteBufferPool"
/** All APIs set errno to meaningful values */
#ifdef __cplusplus
extern "C" {
#endif
/**
* Some utility decls used in libhdfs.
*/
struct hdfsBuilder;
typedef int32_t tSize; /// size of data for read/write io ops
typedef time_t tTime; /// time type in seconds
typedef int64_t tOffset; /// offset within the file
typedef uint16_t tPort; /// port
typedef enum tObjectKind {
kObjectKindFile = 'F',
kObjectKindDirectory = 'D',
} tObjectKind;
/**
* The C reflection of org.apache.org.hadoop.FileSystem .
*/
struct hdfs_internal;
typedef struct hdfs_internal *hdfsFS;
struct hdfsFile_internal;
typedef struct hdfsFile_internal *hdfsFile;
struct hadoopRzOptions;
struct hadoopRzBuffer;
/**
* Determine if a file is open for read.
*
* @param file The HDFS file
* @return 1 if the file is open for read; 0 otherwise
*/
LIBHDFS_EXTERNAL
int hdfsFileIsOpenForRead(hdfsFile file);
/**
* Determine if a file is open for write.
*
* @param file The HDFS file
* @return 1 if the file is open for write; 0 otherwise
*/
LIBHDFS_EXTERNAL
int hdfsFileIsOpenForWrite(hdfsFile file);
struct hdfsReadStatistics {
uint64_t totalBytesRead;
uint64_t totalLocalBytesRead;
uint64_t totalShortCircuitBytesRead;
uint64_t totalZeroCopyBytesRead;
};
/**
* Get read statistics about a file. This is only applicable to files
* opened for reading.
*
* @param file The HDFS file
* @param stats (out parameter) on a successful return, the read
* statistics. Unchanged otherwise. You must free the
* returned statistics with hdfsFileFreeReadStatistics.
* @return 0 if the statistics were successfully returned,
* -1 otherwise. On a failure, please check errno against
* ENOTSUP. webhdfs, LocalFilesystem, and so forth may
* not support read statistics.
*/
LIBHDFS_EXTERNAL
int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats);
/**
* @param stats HDFS read statistics for a file.
*
* @return the number of remote bytes read.
*/
LIBHDFS_EXTERNAL
int64_t hdfsReadStatisticsGetRemoteBytesRead(
const struct hdfsReadStatistics *stats);
/**
* Clear the read statistics for a file.
*
* @param file The file to clear the read statistics of.
*
* @return 0 on success; the error code otherwise.
* EINVAL: the file is not open for reading.
* ENOTSUP: the file does not support clearing the read
* statistics.
* Errno will also be set to this code on failure.
*/
LIBHDFS_EXTERNAL
int hdfsFileClearReadStatistics(hdfsFile file);
/**
* Free some HDFS read statistics.
*
* @param stats The HDFS read statistics to free.
*/
LIBHDFS_EXTERNAL
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats);
/**
* hdfsConnectAsUser - Connect to a hdfs file system as a specific user
* Connect to the hdfs.
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @param user the user name (this is hadoop domain user). Or NULL is equivelant
* to hhdfsConnect(host, port)
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
LIBHDFS_EXTERNAL
hdfsFS hdfsConnectAsUser(const char *nn, tPort port, const char *user);
/**
* hdfsConnect - Connect to a hdfs file system.
* Connect to the hdfs.
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
LIBHDFS_EXTERNAL
hdfsFS hdfsConnect(const char *nn, tPort port);
/**
* hdfsConnect - Connect to an hdfs file system.
*
* Forces a new instance to be created
*
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @param user The user name to use when connecting
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
LIBHDFS_EXTERNAL
hdfsFS hdfsConnectAsUserNewInstance(const char *nn, tPort port,
const char *user);
/**
* hdfsConnect - Connect to an hdfs file system.
*
* Forces a new instance to be created
*
* @param nn The NameNode. See hdfsBuilderSetNameNode for details.
* @param port The port on which the server is listening.
* @return Returns a handle to the filesystem or NULL on error.
* @deprecated Use hdfsBuilderConnect instead.
*/
LIBHDFS_EXTERNAL
hdfsFS hdfsConnectNewInstance(const char *nn, tPort port);
/**
* Connect to HDFS using the parameters defined by the builder.
*
* The HDFS builder will be freed, whether or not the connection was
* successful.
*
* Every successful call to hdfsBuilderConnect should be matched with a call
* to hdfsDisconnect, when the hdfsFS is no longer needed.
*
* @param bld The HDFS builder
* @return Returns a handle to the filesystem, or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld);
/**
* Create an HDFS builder.
*
* @return The HDFS builder, or NULL on error.
*/
LIBHDFS_EXTERNAL
struct hdfsBuilder *hdfsNewBuilder(void);
/**
* Force the builder to always create a new instance of the FileSystem,
* rather than possibly finding one in the cache.
*
* @param bld The HDFS builder
*/
LIBHDFS_EXTERNAL
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld);
/**
* Set the HDFS NameNode to connect to.
*
* @param bld The HDFS builder
* @param nn The NameNode to use.
*
* If the string given is 'default', the default NameNode
* configuration will be used (from the XML configuration files)
*
* If NULL is given, a LocalFileSystem will be created.
*
* If the string starts with a protocol type such as file:// or
* hdfs://, this protocol type will be used. If not, the
* hdfs:// protocol type will be used.
*
* You may specify a NameNode port in the usual way by
* passing a string of the format hdfs://<hostname>:<port>.
* Alternately, you may set the port with
* hdfsBuilderSetNameNodePort. However, you must not pass the
* port in two different ways.
*/
LIBHDFS_EXTERNAL
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn);
/**
* Set the port of the HDFS NameNode to connect to.
*
* @param bld The HDFS builder
* @param port The port.
*/
LIBHDFS_EXTERNAL
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port);
/**
* Set the username to use when connecting to the HDFS cluster.
*
* @param bld The HDFS builder
* @param userName The user name. The string will be shallow-copied.
*/
LIBHDFS_EXTERNAL
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName);
/**
* Set the path to the Kerberos ticket cache to use when connecting to
* the HDFS cluster.
*
* @param bld The HDFS builder
* @param kerbTicketCachePath The Kerberos ticket cache path. The string
* will be shallow-copied.
*/
LIBHDFS_EXTERNAL
void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld,
const char *kerbTicketCachePath);
/**
* Free an HDFS builder.
*
* It is normally not necessary to call this function since
* hdfsBuilderConnect frees the builder.
*
* @param bld The HDFS builder
*/
LIBHDFS_EXTERNAL
void hdfsFreeBuilder(struct hdfsBuilder *bld);
/**
* Set a configuration string for an HdfsBuilder.
*
* @param key The key to set.
* @param val The value, or NULL to set no value.
* This will be shallow-copied. You are responsible for
* ensuring that it remains valid until the builder is
* freed.
*
* @return 0 on success; nonzero error code otherwise.
*/
LIBHDFS_EXTERNAL
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
const char *val);
/**
* Get a configuration string.
*
* @param key The key to find
* @param val (out param) The value. This will be set to NULL if the
* key isn't found. You must free this string with
* hdfsConfStrFree.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
LIBHDFS_EXTERNAL
int hdfsConfGetStr(const char *key, char **val);
/**
* Get a configuration integer.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
LIBHDFS_EXTERNAL
int hdfsConfGetInt(const char *key, int32_t *val);
/**
* Free a configuration string found with hdfsConfGetStr.
*
* @param val A configuration string obtained from hdfsConfGetStr
*/
LIBHDFS_EXTERNAL
void hdfsConfStrFree(char *val);
/**
* hdfsDisconnect - Disconnect from the hdfs file system.
* Disconnect from hdfs.
* @param fs The configured filesystem handle.
* @return Returns 0 on success, -1 on error.
* Even if there is an error, the resources associated with the
* hdfsFS will be freed.
*/
LIBHDFS_EXTERNAL
int hdfsDisconnect(hdfsFS fs);
/**
* hdfsOpenFile - Open a hdfs file in given mode.
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @param flags - an | of bits/fcntl.h file flags - supported flags are
* O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
* O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR ||
* (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
* @param bufferSize Size of buffer for read/write - pass 0 if you want
* to use the default configured values.
* @param replication Block replication - pass 0 if you want to use
* the default configured values.
* @param blocksize Size of block - pass 0 if you want to use the
* default configured values.
* @return Returns the handle to the open file or NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
short replication, tSize blocksize);
/**
* hdfsTruncateFile - Truncate a hdfs file to given lenght.
* @param fs The configured filesystem handle.
* @param path The full path to the file.
* @param newlength The size the file is to be truncated to
* @return 1 if the file has been truncated to the desired newlength
* and is immediately available to be reused for write operations
* such as append.
* 0 if a background process of adjusting the length of the last
* block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* -1 on error.
*/
int hdfsTruncateFile(hdfsFS fs, const char *path, tOffset newlength);
/**
* hdfsUnbufferFile - Reduce the buffering done on a file.
*
* @param file The file to unbuffer.
* @return 0 on success
* ENOTSUP if the file does not support unbuffering
* Errno will also be set to this value.
*/
LIBHDFS_EXTERNAL
int hdfsUnbufferFile(hdfsFile file);
/**
* hdfsCloseFile - Close an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
* On error, errno will be set appropriately.
* If the hdfs file was valid, the memory associated with it will
* be freed at the end of this call, even if there was an I/O
* error.
*/
LIBHDFS_EXTERNAL
int hdfsCloseFile(hdfsFS fs, hdfsFile file);
/**
* hdfsExists - Checks if a given path exsits on the filesystem
* @param fs The configured filesystem handle.
* @param path The path to look for
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsExists(hdfsFS fs, const char *path);
/**
* hdfsSeek - Seek to given offset in file.
* This works only for files opened in read-only mode.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param desiredPos Offset into the file to seek into.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos);
/**
* hdfsTell - Get the current offset in the file, in bytes.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Current offset, -1 on error.
*/
LIBHDFS_EXTERNAL
tOffset hdfsTell(hdfsFS fs, hdfsFile file);
/**
* hdfsRead - Read data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return On success, a positive number indicating how many bytes
* were read.
* On end-of-file, 0.
* On error, -1. Errno will be set to the error code.
* Just like the POSIX read function, hdfsRead will return -1
* and set errno to EINTR if data is temporarily unavailable,
* but we are not yet at the end of the file.
*/
LIBHDFS_EXTERNAL
tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length);
/**
* hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param position Position from which to read
* @param buffer The buffer to copy read bytes into.
* @param length The length of the buffer.
* @return See hdfsRead
*/
LIBHDFS_EXTERNAL
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length);
/**
* hdfsWrite - Write data into an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @param buffer The data.
* @param length The no. of bytes to write.
* @return Returns the number of bytes written, -1 on error.
*/
LIBHDFS_EXTERNAL
tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void *buffer, tSize length);
/**
* hdfsWrite - Flush the data.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsHFlush - Flush out the data in client's user buffer. After the
* return of this call, new readers will see the data.
* @param fs configured filesystem handle
* @param file file handle
* @return 0 on success, -1 on error and sets errno
*/
LIBHDFS_EXTERNAL
int hdfsHFlush(hdfsFS fs, hdfsFile file);
/**
* hdfsHSync - Similar to posix fsync, Flush out the data in client's
* user buffer. all the way to the disk device (but the disk may have
* it in its cache).
* @param fs configured filesystem handle
* @param file file handle
* @return 0 on success, -1 on error and sets errno
*/
LIBHDFS_EXTERNAL
int hdfsHSync(hdfsFS fs, hdfsFile file);
/**
* hdfsAvailable - Number of bytes that can be read from this
* input stream without blocking.
* @param fs The configured filesystem handle.
* @param file The file handle.
* @return Returns available bytes; -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsAvailable(hdfsFS fs, hdfsFile file);
/**
* hdfsCopy - Copy file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsCopy(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst);
/**
* hdfsMove - Move file from one filesystem to another.
* @param srcFS The handle to source filesystem.
* @param src The path of source file.
* @param dstFS The handle to destination filesystem.
* @param dst The path of destination file.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsMove(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst);
/**
* hdfsDelete - Delete file.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param recursive if path is a directory and set to
* non-zero, the directory is deleted else throws an exception. In
* case of a file the recursive argument is irrelevant.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsDelete(hdfsFS fs, const char *path, int recursive);
/**
* hdfsRename - Rename file.
* @param fs The configured filesystem handle.
* @param oldPath The path of the source file.
* @param newPath The path of the destination file.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsRename(hdfsFS fs, const char *oldPath, const char *newPath);
/**
* hdfsGetWorkingDirectory - Get the current working directory for
* the given filesystem.
* @param fs The configured filesystem handle.
* @param buffer The user-buffer to copy path of cwd into.
* @param bufferSize The length of user-buffer.
* @return Returns buffer, NULL on error.
*/
LIBHDFS_EXTERNAL
char *hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize);
/**
* hdfsSetWorkingDirectory - Set the working directory. All relative
* paths will be resolved relative to it.
* @param fs The configured filesystem handle.
* @param path The path of the new 'cwd'.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsSetWorkingDirectory(hdfsFS fs, const char *path);
/**
* hdfsCreateDirectory - Make the given file and all non-existent
* parents into directories.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsCreateDirectory(hdfsFS fs, const char *path);
/**
* hdfsSetReplication - Set the replication of the specified
* file to the supplied value
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns 0 on success, -1 on error.
*/
LIBHDFS_EXTERNAL
int hdfsSetReplication(hdfsFS fs, const char *path, int16_t replication);
/**
* hdfsFileInfo - Information about a file/directory.
*/
typedef struct {
tObjectKind mKind; /* file or directory */
char *mName; /* the name of the file */
tTime mLastMod; /* the last modification time for the file in seconds */
tOffset mSize; /* the size of the file in bytes */
short mReplication; /* the count of replicas */
tOffset mBlockSize; /* the block size for the file */
char *mOwner; /* the owner of the file */
char *mGroup; /* the group associated with the file */
short mPermissions; /* the permissions associated with the file */
tTime mLastAccess; /* the last access time for the file in seconds */
} hdfsFileInfo;
/**
* hdfsListDirectory - Get list of files/directories for a given
* directory-path. hdfsFreeFileInfo should be called to deallocate memory.
* @param fs The configured filesystem handle.
* @param path The path of the directory.
* @param numEntries Set to the number of files/directories in path.
* @return Returns a dynamically-allocated array of hdfsFileInfo
* objects; NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char *path, int *numEntries);
/**
* hdfsGetPathInfo - Get information about a path as a (dynamically
* allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be
* called when the pointer is no longer needed.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @return Returns a dynamically-allocated hdfsFileInfo object;
* NULL on error.
*/
LIBHDFS_EXTERNAL
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char *path);
/**
* hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields)
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
LIBHDFS_EXTERNAL
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries);
/**
* hdfsFileIsEncrypted: determine if a file is encrypted based on its
* hdfsFileInfo.
* @return -1 if there was an error (errno will be set), 0 if the file is
* not encrypted, 1 if the file is encrypted.
*/
LIBHDFS_EXTERNAL
int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo);
/**
* hdfsGetHosts - Get hostnames where a particular block (determined by
* pos & blocksize) of a file is stored. The last element in the array
* is NULL. Due to replication, a single block could be present on
* multiple hosts.
* @param fs The configured filesystem handle.
* @param path The path of the file.
* @param start The start of the block.
* @param length The length of the block.
* @return Returns a dynamically-allocated 2-d array of blocks-hosts;
* NULL on error.
*/
LIBHDFS_EXTERNAL
char ***hdfsGetHosts(hdfsFS fs, const char *path, tOffset start,
tOffset length);
/**
* hdfsFreeHosts - Free up the structure returned by hdfsGetHosts
* @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo
* objects.
* @param numEntries The size of the array.
*/
LIBHDFS_EXTERNAL
void hdfsFreeHosts(char ***blockHosts);
/**
* hdfsGetDefaultBlockSize - Get the default blocksize.
*
* @param fs The configured filesystem handle.
* @deprecated Use hdfsGetDefaultBlockSizeAtPath instead.
*
* @return Returns the default blocksize, or -1 on error.
*/
LIBHDFS_EXTERNAL
tOffset hdfsGetDefaultBlockSize(hdfsFS fs);
/**
* hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the
* filesystem indicated by a given path.
*
* @param fs The configured filesystem handle.
* @param path The given path will be used to locate the actual
* filesystem. The full path does not have to exist.
*
* @return Returns the default blocksize, or -1 on error.
*/
LIBHDFS_EXTERNAL
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path);
/**
* hdfsGetCapacity - Return the raw capacity of the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the raw-capacity; -1 on error.
*/
LIBHDFS_EXTERNAL
tOffset hdfsGetCapacity(hdfsFS fs);
/**
* hdfsGetUsed - Return the total raw size of all files in the filesystem.
* @param fs The configured filesystem handle.
* @return Returns the total-size; -1 on error.
*/
LIBHDFS_EXTERNAL
tOffset hdfsGetUsed(hdfsFS fs);
/**
* Change the user and/or group of a file or directory.
*
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param owner User string. Set to NULL for 'no change'
* @param group Group string. Set to NULL for 'no change'
* @return 0 on success else -1
*/
LIBHDFS_EXTERNAL
int hdfsChown(hdfsFS fs, const char *path, const char *owner,
const char *group);
/**
* hdfsChmod
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mode the bitmask to set it to
* @return 0 on success else -1
*/
LIBHDFS_EXTERNAL
int hdfsChmod(hdfsFS fs, const char *path, short mode);
/**
* hdfsUtime
* @param fs The configured filesystem handle.
* @param path the path to the file or directory
* @param mtime new modification time or -1 for no change
* @param atime new access time or -1 for no change
* @return 0 on success else -1
*/
LIBHDFS_EXTERNAL
int hdfsUtime(hdfsFS fs, const char *path, tTime mtime, tTime atime);
/**
* Allocate a zero-copy options structure.
*
* You must free all options structures allocated with this function using
* hadoopRzOptionsFree.
*
* @return A zero-copy options structure, or NULL if one could
* not be allocated. If NULL is returned, errno will
* contain the error number.
*/
LIBHDFS_EXTERNAL
struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
/**
* Determine whether we should skip checksums in read0.
*
* @param opts The options structure.
* @param skip Nonzero to skip checksums sometimes; zero to always
* check them.
*
* @return 0 on success; -1 plus errno on failure.
*/
LIBHDFS_EXTERNAL
int hadoopRzOptionsSetSkipChecksum(struct hadoopRzOptions *opts, int skip);
/**
* Set the ByteBufferPool to use with read0.
*
* @param opts The options structure.
* @param className If this is NULL, we will not use any
* ByteBufferPool. If this is non-NULL, it will be
* treated as the name of the pool class to use.
* For example, you can use
* ELASTIC_BYTE_BUFFER_POOL_CLASS.
*
* @return 0 if the ByteBufferPool class was found and
* instantiated;
* -1 plus errno otherwise.
*/
LIBHDFS_EXTERNAL
int hadoopRzOptionsSetByteBufferPool(struct hadoopRzOptions *opts,
const char *className);
/**
* Free a hadoopRzOptionsFree structure.
*
* @param opts The options structure to free.
* Any associated ByteBufferPool will also be freed.
*/
LIBHDFS_EXTERNAL
void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
/**
* Perform a byte buffer read.
* If possible, this will be a zero-copy (mmap) read.
*
* @param file The file to read from.
* @param opts An options structure created by hadoopRzOptionsAlloc.
* @param maxLength The maximum length to read. We may read fewer bytes
* than this length.
*
* @return On success, we will return a new hadoopRzBuffer.
* This buffer will continue to be valid and readable
* until it is released by readZeroBufferFree. Failure to
* release a buffer will lead to a memory leak.
* You can access the data within the hadoopRzBuffer with
* hadoopRzBufferGet. If you have reached EOF, the data
* within the hadoopRzBuffer will be NULL. You must still
* free hadoopRzBuffer instances containing NULL.
*
* On failure, we will return NULL plus an errno code.
* errno = EOPNOTSUPP indicates that we could not do a
* zero-copy read, and there was no ByteBufferPool
* supplied.
*/
LIBHDFS_EXTERNAL
struct hadoopRzBuffer *hadoopReadZero(hdfsFile file,
struct hadoopRzOptions *opts,
int32_t maxLength);
/**
* Determine the length of the buffer returned from readZero.
*
* @param buffer a buffer returned from readZero.
* @return the length of the buffer.
*/
LIBHDFS_EXTERNAL
int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
/**
* Get a pointer to the raw buffer returned from readZero.
*
* To find out how many bytes this buffer contains, call
* hadoopRzBufferLength.
*
* @param buffer a buffer returned from readZero.
* @return a pointer to the start of the buffer. This will be
* NULL when end-of-file has been reached.
*/
LIBHDFS_EXTERNAL
const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
/**
* Release a buffer obtained through readZero.
*
* @param file The hdfs stream that created this buffer. This must be
* the same stream you called hadoopReadZero on.
* @param buffer The buffer to release.
*/
LIBHDFS_EXTERNAL
void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
#ifdef __cplusplus
}
#endif
#undef LIBHDFS_EXTERNAL
#endif /*LIBHDFS_HDFS_H*/
/**
* vim: ts=4: sw=4: et
*/

View File

@ -30,6 +30,8 @@ shift 1
# Enable support for Google Cloud Platform (GCP)
export TF_NEED_GCP=1
# Enable support for HDFS
export TF_NEED_HDFS=1
if [[ "$1" == "--disable-gcp" ]]; then
export TF_NEED_GCP=0