diff --git a/configure b/configure index ce75bb490a7..feac1406642 100755 --- a/configure +++ b/configure @@ -1,7 +1,5 @@ #!/usr/bin/env bash -DO_NOT_SUBMIT_WARNING="Unofficial setting. DO NOT SUBMIT!!!" - # Find out the absolute path to where ./configure resides pushd `dirname $0` #> /dev/null SOURCE_BASE_DIR=`pwd -P` @@ -60,6 +58,28 @@ else perl -pi -e "s,WITH_GCP_SUPPORT = (False|True),WITH_GCP_SUPPORT = False,s" tensorflow/core/platform/default/build_config.bzl fi +while [ "$TF_NEED_HDFS" == "" ]; do + read -p "Do you wish to build TensorFlow with "\ +"Hadoop File System support? [y/N] " INPUT + case $INPUT in + [Yy]* ) echo "Hadoop File System support will be enabled for "\ +"TensorFlow"; TF_NEED_HDFS=1;; + [Nn]* ) echo "No Hadoop File System support will be enabled for "\ +"TensorFlow"; TF_NEED_HDFS=0;; + "" ) echo "No Hadoop File System support will be enabled for "\ +"TensorFlow"; TF_NEED_HDFS=0;; + * ) echo "Invalid selection: " $INPUT;; + esac +done + +if [ "$TF_NEED_HDFS" == "1" ]; then + # Update Bazel build configuration. + perl -pi -e "s,WITH_HDFS_SUPPORT = (False|True),WITH_HDFS_SUPPORT = True,s" tensorflow/core/platform/default/build_config.bzl +else + # Update Bazel build configuration. + perl -pi -e "s,WITH_HDFS_SUPPORT = (False|True),WITH_HDFS_SUPPORT = False,s" tensorflow/core/platform/default/build_config.bzl +fi + ## Find swig path if [ -z "$SWIG_PATH" ]; then SWIG_PATH=`type -p swig 2> /dev/null` diff --git a/tensorflow/BUILD b/tensorflow/BUILD index 5b88f1eaa59..39b8868d520 100644 --- a/tensorflow/BUILD +++ b/tensorflow/BUILD @@ -131,6 +131,7 @@ filegroup( "//tensorflow/core/ops/compat:all_files", "//tensorflow/core/platform/cloud:all_files", "//tensorflow/core/platform/default/build_config:all_files", + "//tensorflow/core/platform/hadoop:all_files", "//tensorflow/core/util/ctc:all_files", "//tensorflow/examples/android:all_files", "//tensorflow/examples/how_tos/reading_data:all_files", @@ -166,6 +167,7 @@ filegroup( "//tensorflow/tensorboard/lib:all_files", "//tensorflow/tensorboard/lib/python:all_files", "//tensorflow/tensorboard/scripts:all_files", + "//tensorflow/third_party/hadoop:all_files", "//tensorflow/tools/dist_test/server:all_files", "//tensorflow/tools/docker:all_files", "//tensorflow/tools/docker/notebooks:all_files", diff --git a/tensorflow/contrib/makefile/proto_text_cc_files.txt b/tensorflow/contrib/makefile/proto_text_cc_files.txt index 648be42db58..9b0a0942cd5 100644 --- a/tensorflow/contrib/makefile/proto_text_cc_files.txt +++ b/tensorflow/contrib/makefile/proto_text_cc_files.txt @@ -5,6 +5,7 @@ tensorflow/core/platform/tensor_coding.cc tensorflow/core/platform/protobuf_util.cc tensorflow/core/platform/posix/posix_file_system.cc tensorflow/core/platform/posix/port.cc +tensorflow/core/platform/posix/error.cc tensorflow/core/platform/posix/env.cc tensorflow/core/platform/load_library.cc tensorflow/core/platform/file_system.cc diff --git a/tensorflow/core/BUILD b/tensorflow/core/BUILD index 2ad40d0cf3c..fa5441976d9 100644 --- a/tensorflow/core/BUILD +++ b/tensorflow/core/BUILD @@ -630,6 +630,7 @@ filegroup( "platform/default/test_benchmark.*", "platform/cuda.h", "platform/google/**/*", + "platform/hadoop/**/*", "platform/jpeg.*", "platform/png.*", "platform/gif.*", diff --git a/tensorflow/core/platform/default/build_config.bzl b/tensorflow/core/platform/default/build_config.bzl index c357e3d23e5..8a8dfaef603 100644 --- a/tensorflow/core/platform/default/build_config.bzl +++ b/tensorflow/core/platform/default/build_config.bzl @@ -3,8 +3,9 @@ load("@protobuf//:protobuf.bzl", "cc_proto_library") load("@protobuf//:protobuf.bzl", "py_proto_library") -# configure may change the following line to True +# configure may change the following lines to True WITH_GCP_SUPPORT = False +WITH_HDFS_SUPPORT = False # Appends a suffix to a list of deps. def tf_deps(deps, suffix): @@ -123,5 +124,9 @@ def tf_kernel_tests_linkstatic(): return 0 def tf_additional_lib_deps(): - return (["//tensorflow/core/platform/cloud:gcs_file_system"] - if WITH_GCP_SUPPORT else []) + deps = [] + if WITH_GCP_SUPPORT: + deps.append("//tensorflow/core/platform/cloud:gcs_file_system") + if WITH_HDFS_SUPPORT: + deps.append("//tensorflow/core/platform/hadoop:hadoop_file_system") + return deps diff --git a/tensorflow/core/platform/hadoop/BUILD b/tensorflow/core/platform/hadoop/BUILD new file mode 100644 index 00000000000..457b4fe14cb --- /dev/null +++ b/tensorflow/core/platform/hadoop/BUILD @@ -0,0 +1,69 @@ +# Description: +# Hadoop file system implementation. + +package( + default_visibility = ["//visibility:public"], +) + +licenses(["notice"]) # Apache 2.0 + +load( + "//tensorflow:tensorflow.bzl", + "tf_cc_test", +) + +filegroup( + name = "all_files", + srcs = glob( + ["**/*"], + exclude = [ + "**/METADATA", + "**/OWNERS", + ], + ), + visibility = ["//tensorflow:__subpackages__"], +) + +cc_library( + name = "hadoop_file_system", + srcs = ["hadoop_file_system.cc"], + hdrs = ["hadoop_file_system.h"], + deps = [ + "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", + "//tensorflow/third_party/hadoop:hdfs", + ], + alwayslink = 1, +) + +# This test is set to manual because it requires downloading the Hadoop +# distribution to run. To run this test: +# 1. Ensure $JAVA_HOME is set. +# 2. Download the binary Hadoop distribution from: +# http://hadoop.apache.org/releases.html +# 3. Extract the Hadoop distribution and run: +# source libexec/hadoop-config.sh +# 4. bazel test \ +# --test_env=LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server \ +# --test_env=HADOOP_HDFS_HOME=$HADOOP_HDFS_HOME \ +# --test_env=CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob) \ +# --test_strategy=local \ +# :hadoop_file_system_test +tf_cc_test( + name = "hadoop_file_system_test", + size = "small", + srcs = [ + "hadoop_file_system_test.cc", + ], + tags = [ + "manual", + "notap", + ], + deps = [ + ":hadoop_file_system", + "//tensorflow/core:lib", + "//tensorflow/core:lib_internal", + "//tensorflow/core:test", + "//tensorflow/core:test_main", + ], +) diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.cc b/tensorflow/core/platform/hadoop/hadoop_file_system.cc new file mode 100644 index 00000000000..662818cc9a7 --- /dev/null +++ b/tensorflow/core/platform/hadoop/hadoop_file_system.cc @@ -0,0 +1,431 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/hadoop/hadoop_file_system.h" + +#include <errno.h> + +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/io/path.h" +#include "tensorflow/core/lib/strings/strcat.h" +#include "tensorflow/core/platform/env.h" +#include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/posix/error.h" +#include "tensorflow/third_party/hadoop/hdfs.h" + +namespace tensorflow { + +template <typename R, typename... Args> +Status BindFunc(void* handle, const char* name, + std::function<R(Args...)>* func) { + void* symbol_ptr = nullptr; + TF_RETURN_IF_ERROR( + Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr)); + *func = reinterpret_cast<R (*)(Args...)>(symbol_ptr); + return Status::OK(); +} + +class LibHDFS { + public: + static LibHDFS* Load() { + static LibHDFS* lib = []() -> LibHDFS* { + LibHDFS* lib = new LibHDFS; + lib->LoadAndBind(); + return lib; + }(); + + return lib; + } + + // The status, if any, from failure to load. + Status status() { return status_; } + + std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect; + std::function<hdfsBuilder*()> hdfsNewBuilder; + std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode; + std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile; + std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread; + std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite; + std::function<int(hdfsFS, hdfsFile)> hdfsFlush; + std::function<int(hdfsFS, hdfsFile)> hdfsHSync; + std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)> + hdfsOpenFile; + std::function<int(hdfsFS, const char*)> hdfsExists; + std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory; + std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo; + std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete; + std::function<int(hdfsFS, const char*)> hdfsCreateDirectory; + std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo; + std::function<int(hdfsFS, const char*, const char*)> hdfsRename; + + private: + void LoadAndBind() { + auto TryLoadAndBind = [this](const char* name, void** handle) -> Status { + TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle)); +#define BIND_HDFS_FUNC(function) \ + TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function)); + + BIND_HDFS_FUNC(hdfsBuilderConnect); + BIND_HDFS_FUNC(hdfsNewBuilder); + BIND_HDFS_FUNC(hdfsBuilderSetNameNode); + BIND_HDFS_FUNC(hdfsCloseFile); + BIND_HDFS_FUNC(hdfsPread); + BIND_HDFS_FUNC(hdfsWrite); + BIND_HDFS_FUNC(hdfsFlush); + BIND_HDFS_FUNC(hdfsHSync); + BIND_HDFS_FUNC(hdfsOpenFile); + BIND_HDFS_FUNC(hdfsExists); + BIND_HDFS_FUNC(hdfsListDirectory); + BIND_HDFS_FUNC(hdfsFreeFileInfo); + BIND_HDFS_FUNC(hdfsDelete); + BIND_HDFS_FUNC(hdfsCreateDirectory); + BIND_HDFS_FUNC(hdfsGetPathInfo); + BIND_HDFS_FUNC(hdfsRename); +#undef BIND_HDFS_FUNC + return Status::OK(); + }; + + // libhdfs.so won't be in the standard locations. Use the path as specified + // in the libhdfs documentation. + char* hdfs_home = getenv("HADOOP_HDFS_HOME"); + if (hdfs_home == nullptr) { + status_ = errors::FailedPrecondition( + "Environment variable HADOOP_HDFS_HOME not set"); + return; + } + string path = io::JoinPath(hdfs_home, "lib", "native", "libhdfs.so"); + status_ = TryLoadAndBind(path.c_str(), &handle_); + return; + } + + Status status_; + void* handle_ = nullptr; +}; + +HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {} + +HadoopFileSystem::~HadoopFileSystem() {} + +// We rely on HDFS connection caching here. The HDFS client calls +// org.apache.hadoop.fs.FileSystem.get(), which caches the connection +// internally. +Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) { + TF_RETURN_IF_ERROR(hdfs_->status()); + + if (!fname.Consume("hdfs://")) { + return errors::InvalidArgument("HDFS path must start with hdfs://"); + } + auto first_slash = fname.find('/'); + string namenode; + if (first_slash == string::npos) { + namenode = fname.ToString(); + } else { + namenode = fname.substr(0, first_slash).ToString(); + } + + hdfsBuilder* builder = hdfs_->hdfsNewBuilder(); + if (namenode == "localfilesystem") { + hdfs_->hdfsBuilderSetNameNode(builder, nullptr); + } else { + hdfs_->hdfsBuilderSetNameNode(builder, namenode.c_str()); + } + *fs = hdfs_->hdfsBuilderConnect(builder); + if (*fs == nullptr) { + return errors::NotFound(strerror(errno)); + } + return Status::OK(); +} + +string HadoopFileSystem::TranslateName(const string& name) const { + StringPiece sp = name; + sp.Consume("hdfs://"); + auto first_slash = sp.find('/'); + if (first_slash == string::npos) { + return string(); + } + sp.remove_prefix(first_slash); + return sp.ToString(); +} + +class HDFSRandomAccessFile : public RandomAccessFile { + public: + HDFSRandomAccessFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, + hdfsFile file) + : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {} + + ~HDFSRandomAccessFile() override { hdfs_->hdfsCloseFile(fs_, file_); } + + Status Read(uint64 offset, size_t n, StringPiece* result, + char* scratch) const override { + Status s; + char* dst = scratch; + while (n > 0 && s.ok()) { + tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst, + static_cast<tSize>(n)); + if (r > 0) { + dst += r; + n -= r; + offset += r; + } else if (r == 0) { + s = Status(error::OUT_OF_RANGE, "Read less bytes than requested"); + } else if (errno == EINTR || errno == EAGAIN) { + // hdfsPread may return EINTR too. Just retry. + } else { + s = IOError(filename_, errno); + } + } + *result = StringPiece(scratch, dst - scratch); + return s; + } + + private: + string filename_; + LibHDFS* hdfs_; + hdfsFS fs_; + hdfsFile file_; +}; + +Status HadoopFileSystem::NewRandomAccessFile( + const string& fname, std::unique_ptr<RandomAccessFile>* result) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + hdfsFile file = + hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0); + if (file == nullptr) { + return IOError(fname, errno); + } + result->reset(new HDFSRandomAccessFile(fname, hdfs_, fs, file)); + return Status::OK(); +} + +class HDFSWritableFile : public WritableFile { + public: + HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file) + : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {} + + ~HDFSWritableFile() override { + if (file_ != nullptr) { + Close(); + } + } + + Status Append(const StringPiece& data) override { + if (hdfs_->hdfsWrite(fs_, file_, data.data(), + static_cast<tSize>(data.size())) == -1) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + Status Close() override { + Status result; + if (hdfs_->hdfsCloseFile(fs_, file_) != 0) { + result = IOError(filename_, errno); + } + hdfs_ = nullptr; + fs_ = nullptr; + file_ = nullptr; + return result; + } + + Status Flush() override { + if (hdfs_->hdfsFlush(fs_, file_) != 0) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + Status Sync() override { + if (hdfs_->hdfsHSync(fs_, file_) != 0) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + private: + string filename_; + LibHDFS* hdfs_; + hdfsFS fs_; + hdfsFile file_; +}; + +Status HadoopFileSystem::NewWritableFile( + const string& fname, std::unique_ptr<WritableFile>* result) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + hdfsFile file = + hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0); + if (file == nullptr) { + return IOError(fname, errno); + } + result->reset(new HDFSWritableFile(fname, hdfs_, fs, file)); + return Status::OK(); +} + +Status HadoopFileSystem::NewAppendableFile( + const string& fname, std::unique_ptr<WritableFile>* result) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), + O_WRONLY | O_APPEND, 0, 0, 0); + if (file == nullptr) { + return IOError(fname, errno); + } + result->reset(new HDFSWritableFile(fname, hdfs_, fs, file)); + return Status::OK(); +} + +Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile( + const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) { + // hadoopReadZero() technically supports this call with the following + // caveats: + // - It only works up to 2 GB. We'd have to Stat() the file to ensure that + // it fits. + // - If not on the local filesystem, the entire file will be read, making + // it inefficient for callers that assume typical mmap() behavior. + return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion"); +} + +bool HadoopFileSystem::FileExists(const string& fname) { + hdfsFS fs = nullptr; + Status status = Connect(fname, &fs); + if (!status.ok()) { + LOG(ERROR) << "Connect failed: " << status.error_message(); + return false; + } + + return hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0; +} + +Status HadoopFileSystem::GetChildren(const string& dir, + std::vector<string>* result) { + result->clear(); + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(dir, &fs)); + + // hdfsListDirectory returns nullptr if the directory is empty. Do a separate + // check to verify the directory exists first. + FileStatistics stat; + TF_RETURN_IF_ERROR(Stat(dir, &stat)); + + int entries = 0; + hdfsFileInfo* info = + hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries); + if (info == nullptr) { + if (stat.is_directory) { + // Assume it's an empty directory. + return Status::OK(); + } + return IOError(dir, errno); + } + for (int i = 0; i < entries; i++) { + result->push_back(io::Basename(info[i].mName).ToString()); + } + hdfs_->hdfsFreeFileInfo(info, entries); + return Status::OK(); +} + +Status HadoopFileSystem::DeleteFile(const string& fname) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(), + /*recursive=*/0) != 0) { + return IOError(fname, errno); + } + return Status::OK(); +} + +Status HadoopFileSystem::CreateDir(const string& dir) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(dir, &fs)); + + if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) { + return IOError(dir, errno); + } + return Status::OK(); +} + +Status HadoopFileSystem::DeleteDir(const string& dir) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(dir, &fs)); + + // Count the number of entries in the directory, and only delete if it's + // non-empty. This is consistent with the interface, but note that there's + // a race condition where a file may be added after this check, in which + // case the directory will still be deleted. + int entries = 0; + hdfsFileInfo* info = + hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries); + if (info != nullptr) { + return IOError(dir, errno); + } + hdfs_->hdfsFreeFileInfo(info, entries); + + if (entries > 0) { + return errors::FailedPrecondition("Cannot delete a non-empty directory."); + } + if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(), + /*recursive=*/1) != 0) { + return IOError(dir, errno); + } + return Status::OK(); +} + +Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str()); + if (info == nullptr) { + return IOError(fname, errno); + } + *size = static_cast<uint64>(info->mSize); + hdfs_->hdfsFreeFileInfo(info, 1); + return Status::OK(); +} + +Status HadoopFileSystem::RenameFile(const string& src, const string& target) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(src, &fs)); + + if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(), + TranslateName(target).c_str()) != 0) { + return IOError(src, errno); + } + return Status::OK(); +} + +Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) { + hdfsFS fs = nullptr; + TF_RETURN_IF_ERROR(Connect(fname, &fs)); + + hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str()); + if (info == nullptr) { + return IOError(fname, errno); + } + stats->length = static_cast<int64>(info->mSize); + stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9; + stats->is_directory = info->mKind == kObjectKindDirectory; + hdfs_->hdfsFreeFileInfo(info, 1); + return Status::OK(); +} + +REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem); + +} // namespace tensorflow diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system.h b/tensorflow/core/platform/hadoop/hadoop_file_system.h new file mode 100644 index 00000000000..182d6995fc7 --- /dev/null +++ b/tensorflow/core/platform/hadoop/hadoop_file_system.h @@ -0,0 +1,73 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_ +#define THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_ + +#include "tensorflow/core/platform/env.h" + +extern "C" { +struct hdfs_internal; +typedef hdfs_internal* hdfsFS; +} + +namespace tensorflow { + +class LibHDFS; + +class HadoopFileSystem : public FileSystem { + public: + HadoopFileSystem(); + ~HadoopFileSystem(); + + Status NewRandomAccessFile( + const string& fname, std::unique_ptr<RandomAccessFile>* result) override; + + Status NewWritableFile(const string& fname, + std::unique_ptr<WritableFile>* result) override; + + Status NewAppendableFile(const string& fname, + std::unique_ptr<WritableFile>* result) override; + + Status NewReadOnlyMemoryRegionFromFile( + const string& fname, + std::unique_ptr<ReadOnlyMemoryRegion>* result) override; + + bool FileExists(const string& fname) override; + + Status GetChildren(const string& dir, std::vector<string>* result) override; + + Status DeleteFile(const string& fname) override; + + Status CreateDir(const string& name) override; + + Status DeleteDir(const string& name) override; + + Status GetFileSize(const string& fname, uint64* size) override; + + Status RenameFile(const string& src, const string& target) override; + + Status Stat(const string& fname, FileStatistics* stat) override; + + string TranslateName(const string& name) const override; + + private: + Status Connect(StringPiece fname, hdfsFS* fs); + LibHDFS* hdfs_; +}; + +} // namespace tensorflow + +#endif // THIRD_PARTY_TENSORFLOW_CORE_PLATFORM_HADOOP_HADOOP_FILE_SYSTEM_H_ diff --git a/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc new file mode 100644 index 00000000000..f927e15752f --- /dev/null +++ b/tensorflow/core/platform/hadoop/hadoop_file_system_test.cc @@ -0,0 +1,186 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/hadoop/hadoop_file_system.h" + +#include "tensorflow/core/lib/core/status_test_util.h" +#include "tensorflow/core/lib/gtl/stl_util.h" +#include "tensorflow/core/lib/io/path.h" +#include "tensorflow/core/platform/file_system.h" +#include "tensorflow/core/platform/test.h" + +namespace tensorflow { +namespace { + +class HadoopFileSystemTest : public ::testing::Test { + protected: + HadoopFileSystemTest() {} + + Status WriteString(const string& fname, const string& content) { + std::unique_ptr<WritableFile> writer; + TF_RETURN_IF_ERROR(hdfs.NewWritableFile(fname, &writer)); + TF_RETURN_IF_ERROR(writer->Append(content)); + TF_RETURN_IF_ERROR(writer->Close()); + return Status::OK(); + } + + Status ReadAll(const string& fname, string* content) { + std::unique_ptr<RandomAccessFile> reader; + TF_RETURN_IF_ERROR(hdfs.NewRandomAccessFile(fname, &reader)); + + uint64 file_size = 0; + TF_RETURN_IF_ERROR(hdfs.GetFileSize(fname, &file_size)); + + content->resize(file_size); + StringPiece result; + TF_RETURN_IF_ERROR( + reader->Read(0, file_size, &result, gtl::string_as_array(content))); + if (file_size != result.size()) { + return errors::DataLoss("expected ", file_size, " got ", result.size(), + " bytes"); + } + return Status::OK(); + } + + HadoopFileSystem hdfs; +}; + +TEST_F(HadoopFileSystemTest, RandomAccessFile) { + const string fname = io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), + "RandomAccessFile"); + const string content = "abcdefghijklmn"; + TF_ASSERT_OK(WriteString(fname, content)); + + std::unique_ptr<RandomAccessFile> reader; + TF_EXPECT_OK(hdfs.NewRandomAccessFile(fname, &reader)); + + string got; + got.resize(content.size()); + StringPiece result; + TF_EXPECT_OK( + reader->Read(0, content.size(), &result, gtl::string_as_array(&got))); + EXPECT_EQ(content.size(), result.size()); + EXPECT_EQ(content, result); + + got.clear(); + got.resize(4); + TF_EXPECT_OK(reader->Read(2, 4, &result, gtl::string_as_array(&got))); + EXPECT_EQ(4, result.size()); + EXPECT_EQ(content.substr(2, 4), result); +} + +TEST_F(HadoopFileSystemTest, WritableFile) { + std::unique_ptr<WritableFile> writer; + const string fname = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "WritableFile"); + TF_EXPECT_OK(hdfs.NewWritableFile(fname, &writer)); + TF_EXPECT_OK(writer->Append("content1,")); + TF_EXPECT_OK(writer->Append("content2")); + TF_EXPECT_OK(writer->Flush()); + TF_EXPECT_OK(writer->Sync()); + TF_EXPECT_OK(writer->Close()); + + string content; + TF_EXPECT_OK(ReadAll(fname, &content)); + EXPECT_EQ("content1,content2", content); +} + +TEST_F(HadoopFileSystemTest, FileExists) { + const string fname = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "FileExists"); + EXPECT_FALSE(hdfs.FileExists(fname)); + TF_ASSERT_OK(WriteString(fname, "test")); + EXPECT_TRUE(hdfs.FileExists(fname)); +} + +TEST_F(HadoopFileSystemTest, GetChildren) { + const string base = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "GetChildren"); + TF_EXPECT_OK(hdfs.CreateDir(base)); + + const string file = io::JoinPath(base, "testfile.csv"); + TF_EXPECT_OK(WriteString(file, "blah")); + const string subdir = io::JoinPath(base, "subdir"); + TF_EXPECT_OK(hdfs.CreateDir(subdir)); + + vector<string> children; + TF_EXPECT_OK(hdfs.GetChildren(base, &children)); + std::sort(children.begin(), children.end()); + EXPECT_EQ(vector<string>({"subdir", "testfile.csv"}), children); +} + +TEST_F(HadoopFileSystemTest, DeleteFile) { + const string fname = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "DeleteFile"); + EXPECT_FALSE(hdfs.DeleteFile(fname).ok()); + TF_ASSERT_OK(WriteString(fname, "test")); + TF_EXPECT_OK(hdfs.DeleteFile(fname)); +} + +TEST_F(HadoopFileSystemTest, GetFileSize) { + const string fname = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "GetFileSize"); + TF_ASSERT_OK(WriteString(fname, "test")); + uint64 file_size = 0; + TF_EXPECT_OK(hdfs.GetFileSize(fname, &file_size)); + EXPECT_EQ(4, file_size); +} + +TEST_F(HadoopFileSystemTest, CreateDirStat) { + const string dir = io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), + "CreateDirStat"); + TF_EXPECT_OK(hdfs.CreateDir(dir)); + FileStatistics stat; + TF_EXPECT_OK(hdfs.Stat(dir, &stat)); + EXPECT_TRUE(stat.is_directory); +} + +TEST_F(HadoopFileSystemTest, DeleteDir) { + const string dir = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "DeleteDir"); + EXPECT_FALSE(hdfs.DeleteDir(dir).ok()); + TF_EXPECT_OK(hdfs.CreateDir(dir)); + TF_EXPECT_OK(hdfs.DeleteDir(dir)); + FileStatistics stat; + EXPECT_FALSE(hdfs.Stat(dir, &stat).ok()); +} + +TEST_F(HadoopFileSystemTest, RenameFile) { + const string fname1 = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "RenameFile1"); + const string fname2 = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "RenameFile2"); + TF_ASSERT_OK(WriteString(fname1, "test")); + TF_EXPECT_OK(hdfs.RenameFile(fname1, fname2)); + string content; + TF_EXPECT_OK(ReadAll(fname2, &content)); + EXPECT_EQ("test", content); +} + +TEST_F(HadoopFileSystemTest, StatFile) { + const string fname = + io::JoinPath("hdfs://localfilesystem", testing::TmpDir(), "StatFile"); + TF_ASSERT_OK(WriteString(fname, "test")); + FileStatistics stat; + TF_EXPECT_OK(hdfs.Stat(fname, &stat)); + EXPECT_EQ(4, stat.length); + EXPECT_FALSE(stat.is_directory); +} + +// NewAppendableFile() is not testable. Local filesystem maps to +// ChecksumFileSystem in Hadoop, where appending is an unsupported operation. + +} // namespace +} // namespace tensorflow diff --git a/tensorflow/core/platform/posix/error.cc b/tensorflow/core/platform/posix/error.cc new file mode 100644 index 00000000000..30d135a2556 --- /dev/null +++ b/tensorflow/core/platform/posix/error.cc @@ -0,0 +1,163 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow/core/platform/posix/error.h" + +#include <errno.h> +#include <string.h> + +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/lib/strings/strcat.h" + +namespace tensorflow { + +error::Code ErrnoToCode(int err_number) { + error::Code code; + switch (err_number) { + case 0: + code = error::OK; + break; + case EINVAL: // Invalid argument + case ENAMETOOLONG: // Filename too long + case E2BIG: // Argument list too long + case EDESTADDRREQ: // Destination address required + case EDOM: // Mathematics argument out of domain of function + case EFAULT: // Bad address + case EILSEQ: // Illegal byte sequence + case ENOPROTOOPT: // Protocol not available + case ENOSTR: // Not a STREAM + case ENOTSOCK: // Not a socket + case ENOTTY: // Inappropriate I/O control operation + case EPROTOTYPE: // Protocol wrong type for socket + case ESPIPE: // Invalid seek + code = error::INVALID_ARGUMENT; + break; + case ETIMEDOUT: // Connection timed out + case ETIME: // Timer expired + code = error::DEADLINE_EXCEEDED; + break; + case ENODEV: // No such device + case ENOENT: // No such file or directory + case ENXIO: // No such device or address + case ESRCH: // No such process + code = error::NOT_FOUND; + break; + case EEXIST: // File exists + case EADDRNOTAVAIL: // Address not available + case EALREADY: // Connection already in progress + code = error::ALREADY_EXISTS; + break; + case EPERM: // Operation not permitted + case EACCES: // Permission denied + case EROFS: // Read only file system + code = error::PERMISSION_DENIED; + break; + case ENOTEMPTY: // Directory not empty + case EISDIR: // Is a directory + case ENOTDIR: // Not a directory + case EADDRINUSE: // Address already in use + case EBADF: // Invalid file descriptor + case EBUSY: // Device or resource busy + case ECHILD: // No child processes + case EISCONN: // Socket is connected + case ENOTBLK: // Block device required + case ENOTCONN: // The socket is not connected + case EPIPE: // Broken pipe + case ESHUTDOWN: // Cannot send after transport endpoint shutdown + case ETXTBSY: // Text file busy + code = error::FAILED_PRECONDITION; + break; + case ENOSPC: // No space left on device + case EDQUOT: // Disk quota exceeded + case EMFILE: // Too many open files + case EMLINK: // Too many links + case ENFILE: // Too many open files in system + case ENOBUFS: // No buffer space available + case ENODATA: // No message is available on the STREAM read queue + case ENOMEM: // Not enough space + case ENOSR: // No STREAM resources + case EUSERS: // Too many users + code = error::RESOURCE_EXHAUSTED; + break; + case EFBIG: // File too large + case EOVERFLOW: // Value too large to be stored in data type + case ERANGE: // Result too large + code = error::OUT_OF_RANGE; + break; + case ENOSYS: // Function not implemented + case ENOTSUP: // Operation not supported + case EAFNOSUPPORT: // Address family not supported + case EPFNOSUPPORT: // Protocol family not supported + case EPROTONOSUPPORT: // Protocol not supported + case ESOCKTNOSUPPORT: // Socket type not supported + case EXDEV: // Improper link + code = error::UNIMPLEMENTED; + break; + case EAGAIN: // Resource temporarily unavailable + case ECONNREFUSED: // Connection refused + case ECONNABORTED: // Connection aborted + case ECONNRESET: // Connection reset + case EINTR: // Interrupted function call + case EHOSTDOWN: // Host is down + case EHOSTUNREACH: // Host is unreachable + case ENETDOWN: // Network is down + case ENETRESET: // Connection aborted by network + case ENETUNREACH: // Network unreachable + case ENOLCK: // No locks available + case ENOLINK: // Link has been severed +#if !defined(__APPLE__) + case ENONET: // Machine is not on the network +#endif + code = error::UNAVAILABLE; + break; + case EDEADLK: // Resource deadlock avoided + case ESTALE: // Stale file handle + code = error::ABORTED; + break; + case ECANCELED: // Operation cancelled + code = error::CANCELLED; + break; + // NOTE: If you get any of the following (especially in a + // reproducible way) and can propose a better mapping, + // please email the owners about updating this mapping. + case EBADMSG: // Bad message + case EIDRM: // Identifier removed + case EINPROGRESS: // Operation in progress + case EIO: // I/O error + case ELOOP: // Too many levels of symbolic links + case ENOEXEC: // Exec format error + case ENOMSG: // No message of the desired type + case EPROTO: // Protocol error + case EREMOTE: // Object is remote + code = error::UNKNOWN; + break; + default: { + code = error::UNKNOWN; + break; + } + } + return code; +} + +Status IOError(const string& context, int err_number) { + auto code = ErrnoToCode(err_number); + if (code == error::UNKNOWN) { + return Status(code, strings::StrCat(context, "; ", strerror(err_number))); + } else { + return Status(code, context); + } +} + +} // namespace tensorflow diff --git a/tensorflow/core/platform/posix/error.h b/tensorflow/core/platform/posix/error.h new file mode 100644 index 00000000000..9b614d0f702 --- /dev/null +++ b/tensorflow/core/platform/posix/error.h @@ -0,0 +1,27 @@ +/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_CORE_PLATFORM_POSIX_ERROR_H_ +#define TENSORFLOW_CORE_PLATFORM_POSIX_ERROR_H_ + +#include "tensorflow/core/lib/core/status.h" + +namespace tensorflow { + +Status IOError(const string& context, int err_number); + +} // namespace tensorflow + +#endif // TENSORFLOW_CORE_PLATFORM_POSIX_POSIX_FILE_SYSTEM_H_ diff --git a/tensorflow/core/platform/posix/posix_file_system.cc b/tensorflow/core/platform/posix/posix_file_system.cc index 7ec8d16d33e..4bd8d84d6c3 100644 --- a/tensorflow/core/platform/posix/posix_file_system.cc +++ b/tensorflow/core/platform/posix/posix_file_system.cc @@ -25,143 +25,15 @@ limitations under the License. #include <unistd.h> #include "tensorflow/core/lib/core/error_codes.pb.h" +#include "tensorflow/core/lib/core/status.h" #include "tensorflow/core/lib/strings/strcat.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/logging.h" +#include "tensorflow/core/platform/posix/error.h" #include "tensorflow/core/platform/posix/posix_file_system.h" namespace tensorflow { -namespace { - -error::Code ErrnoToCode(int err_number) { - error::Code code; - switch (err_number) { - case 0: - code = error::OK; - break; - case EINVAL: // Invalid argument - case ENAMETOOLONG: // Filename too long - case E2BIG: // Argument list too long - case EDESTADDRREQ: // Destination address required - case EDOM: // Mathematics argument out of domain of function - case EFAULT: // Bad address - case EILSEQ: // Illegal byte sequence - case ENOPROTOOPT: // Protocol not available - case ENOSTR: // Not a STREAM - case ENOTSOCK: // Not a socket - case ENOTTY: // Inappropriate I/O control operation - case EPROTOTYPE: // Protocol wrong type for socket - case ESPIPE: // Invalid seek - code = error::INVALID_ARGUMENT; - break; - case ETIMEDOUT: // Connection timed out - case ETIME: // Timer expired - code = error::DEADLINE_EXCEEDED; - break; - case ENODEV: // No such device - case ENOENT: // No such file or directory - case ENXIO: // No such device or address - case ESRCH: // No such process - code = error::NOT_FOUND; - break; - case EEXIST: // File exists - case EADDRNOTAVAIL: // Address not available - case EALREADY: // Connection already in progress - code = error::ALREADY_EXISTS; - break; - case EPERM: // Operation not permitted - case EACCES: // Permission denied - case EROFS: // Read only file system - code = error::PERMISSION_DENIED; - break; - case ENOTEMPTY: // Directory not empty - case EISDIR: // Is a directory - case ENOTDIR: // Not a directory - case EADDRINUSE: // Address already in use - case EBADF: // Invalid file descriptor - case EBUSY: // Device or resource busy - case ECHILD: // No child processes - case EISCONN: // Socket is connected - case ENOTBLK: // Block device required - case ENOTCONN: // The socket is not connected - case EPIPE: // Broken pipe - case ESHUTDOWN: // Cannot send after transport endpoint shutdown - case ETXTBSY: // Text file busy - code = error::FAILED_PRECONDITION; - break; - case ENOSPC: // No space left on device - case EDQUOT: // Disk quota exceeded - case EMFILE: // Too many open files - case EMLINK: // Too many links - case ENFILE: // Too many open files in system - case ENOBUFS: // No buffer space available - case ENODATA: // No message is available on the STREAM read queue - case ENOMEM: // Not enough space - case ENOSR: // No STREAM resources - case EUSERS: // Too many users - code = error::RESOURCE_EXHAUSTED; - break; - case EFBIG: // File too large - case EOVERFLOW: // Value too large to be stored in data type - case ERANGE: // Result too large - code = error::OUT_OF_RANGE; - break; - case ENOSYS: // Function not implemented - case ENOTSUP: // Operation not supported - case EAFNOSUPPORT: // Address family not supported - case EPFNOSUPPORT: // Protocol family not supported - case EPROTONOSUPPORT: // Protocol not supported - case ESOCKTNOSUPPORT: // Socket type not supported - case EXDEV: // Improper link - code = error::UNIMPLEMENTED; - break; - case EAGAIN: // Resource temporarily unavailable - case ECONNREFUSED: // Connection refused - case ECONNABORTED: // Connection aborted - case ECONNRESET: // Connection reset - case EINTR: // Interrupted function call - case EHOSTDOWN: // Host is down - case EHOSTUNREACH: // Host is unreachable - case ENETDOWN: // Network is down - case ENETRESET: // Connection aborted by network - case ENETUNREACH: // Network unreachable - case ENOLCK: // No locks available - case ENOLINK: // Link has been severed -#if !defined(__APPLE__) - case ENONET: // Machine is not on the network -#endif - code = error::UNAVAILABLE; - break; - case EDEADLK: // Resource deadlock avoided - case ESTALE: // Stale file handle - code = error::ABORTED; - break; - case ECANCELED: // Operation cancelled - code = error::CANCELLED; - break; - // NOTE: If you get any of the following (especially in a - // reproducible way) and can propose a better mapping, - // please email the owners about updating this mapping. - case EBADMSG: // Bad message - case EIDRM: // Identifier removed - case EINPROGRESS: // Operation in progress - case EIO: // I/O error - case ELOOP: // Too many levels of symbolic links - case ENOEXEC: // Exec format error - case ENOMSG: // No message of the desired type - case EPROTO: // Protocol error - case EREMOTE: // Object is remote - code = error::UNKNOWN; - break; - default: { - code = error::UNKNOWN; - break; - } - } - return code; -} - // pread() based random-access class PosixRandomAccessFile : public RandomAccessFile { private: @@ -258,8 +130,6 @@ class PosixReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { const uint64 length_; }; -} // namespace - Status PosixFileSystem::NewRandomAccessFile( const string& fname, std::unique_ptr<RandomAccessFile>* result) { string translated_fname = TranslateName(fname); @@ -401,13 +271,4 @@ Status PosixFileSystem::RenameFile(const string& src, const string& target) { return result; } -Status IOError(const string& context, int err_number) { - auto code = ErrnoToCode(err_number); - if (code == error::UNKNOWN) { - return Status(code, strings::StrCat(context, "; ", strerror(err_number))); - } else { - return Status(code, context); - } -} - } // namespace tensorflow diff --git a/tensorflow/third_party/hadoop/BUILD b/tensorflow/third_party/hadoop/BUILD new file mode 100644 index 00000000000..f25208c4167 --- /dev/null +++ b/tensorflow/third_party/hadoop/BUILD @@ -0,0 +1,20 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) # Apache 2.0 + +filegroup( + name = "all_files", + srcs = glob( + ["**/*"], + exclude = [ + "**/METADATA", + "**/OWNERS", + ], + ), + visibility = ["//tensorflow:__subpackages__"], +) + +cc_library( + name = "hdfs", + hdrs = ["hdfs.h"], +) diff --git a/tensorflow/third_party/hadoop/hdfs.h b/tensorflow/third_party/hadoop/hdfs.h new file mode 100644 index 00000000000..560d8bba0e0 --- /dev/null +++ b/tensorflow/third_party/hadoop/hdfs.h @@ -0,0 +1,911 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBHDFS_HDFS_H +#define LIBHDFS_HDFS_H + +#include <errno.h> /* for EINTERNAL, etc. */ +#include <fcntl.h> /* for O_RDONLY, O_WRONLY */ +#include <stdint.h> /* for uint64_t, etc. */ +#include <time.h> /* for time_t */ + +/* + * Support export of DLL symbols during libhdfs build, and import of DLL symbols + * during client application build. A client application may optionally define + * symbol LIBHDFS_DLL_IMPORT in its build. This is not strictly required, but + * the compiler can produce more efficient code with it. + */ +#ifdef WIN32 +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __declspec(dllexport) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __declspec(dllimport) +#else +#define LIBHDFS_EXTERNAL +#endif +#else +#ifdef LIBHDFS_DLL_EXPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#elif LIBHDFS_DLL_IMPORT +#define LIBHDFS_EXTERNAL __attribute__((visibility("default"))) +#else +#define LIBHDFS_EXTERNAL +#endif +#endif + +#ifndef O_RDONLY +#define O_RDONLY 1 +#endif + +#ifndef O_WRONLY +#define O_WRONLY 2 +#endif + +#ifndef EINTERNAL +#define EINTERNAL 255 +#endif + +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" + +/** All APIs set errno to meaningful values */ + +#ifdef __cplusplus +extern "C" { +#endif +/** + * Some utility decls used in libhdfs. + */ +struct hdfsBuilder; +typedef int32_t tSize; /// size of data for read/write io ops +typedef time_t tTime; /// time type in seconds +typedef int64_t tOffset; /// offset within the file +typedef uint16_t tPort; /// port +typedef enum tObjectKind { + kObjectKindFile = 'F', + kObjectKindDirectory = 'D', +} tObjectKind; + +/** + * The C reflection of org.apache.org.hadoop.FileSystem . + */ +struct hdfs_internal; +typedef struct hdfs_internal *hdfsFS; + +struct hdfsFile_internal; +typedef struct hdfsFile_internal *hdfsFile; + +struct hadoopRzOptions; + +struct hadoopRzBuffer; + +/** + * Determine if a file is open for read. + * + * @param file The HDFS file + * @return 1 if the file is open for read; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForRead(hdfsFile file); + +/** + * Determine if a file is open for write. + * + * @param file The HDFS file + * @return 1 if the file is open for write; 0 otherwise + */ +LIBHDFS_EXTERNAL +int hdfsFileIsOpenForWrite(hdfsFile file); + +struct hdfsReadStatistics { + uint64_t totalBytesRead; + uint64_t totalLocalBytesRead; + uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; +}; + +/** + * Get read statistics about a file. This is only applicable to files + * opened for reading. + * + * @param file The HDFS file + * @param stats (out parameter) on a successful return, the read + * statistics. Unchanged otherwise. You must free the + * returned statistics with hdfsFileFreeReadStatistics. + * @return 0 if the statistics were successfully returned, + * -1 otherwise. On a failure, please check errno against + * ENOTSUP. webhdfs, LocalFilesystem, and so forth may + * not support read statistics. + */ +LIBHDFS_EXTERNAL +int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats); + +/** + * @param stats HDFS read statistics for a file. + * + * @return the number of remote bytes read. + */ +LIBHDFS_EXTERNAL +int64_t hdfsReadStatisticsGetRemoteBytesRead( + const struct hdfsReadStatistics *stats); + +/** + * Clear the read statistics for a file. + * + * @param file The file to clear the read statistics of. + * + * @return 0 on success; the error code otherwise. + * EINVAL: the file is not open for reading. + * ENOTSUP: the file does not support clearing the read + * statistics. + * Errno will also be set to this code on failure. + */ +LIBHDFS_EXTERNAL +int hdfsFileClearReadStatistics(hdfsFile file); + +/** + * Free some HDFS read statistics. + * + * @param stats The HDFS read statistics to free. + */ +LIBHDFS_EXTERNAL +void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats); + +/** + * hdfsConnectAsUser - Connect to a hdfs file system as a specific user + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user the user name (this is hadoop domain user). Or NULL is equivelant + * to hhdfsConnect(host, port) + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectAsUser(const char *nn, tPort port, const char *user); + +/** + * hdfsConnect - Connect to a hdfs file system. + * Connect to the hdfs. + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnect(const char *nn, tPort port); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @param user The user name to use when connecting + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectAsUserNewInstance(const char *nn, tPort port, + const char *user); + +/** + * hdfsConnect - Connect to an hdfs file system. + * + * Forces a new instance to be created + * + * @param nn The NameNode. See hdfsBuilderSetNameNode for details. + * @param port The port on which the server is listening. + * @return Returns a handle to the filesystem or NULL on error. + * @deprecated Use hdfsBuilderConnect instead. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsConnectNewInstance(const char *nn, tPort port); + +/** + * Connect to HDFS using the parameters defined by the builder. + * + * The HDFS builder will be freed, whether or not the connection was + * successful. + * + * Every successful call to hdfsBuilderConnect should be matched with a call + * to hdfsDisconnect, when the hdfsFS is no longer needed. + * + * @param bld The HDFS builder + * @return Returns a handle to the filesystem, or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld); + +/** + * Create an HDFS builder. + * + * @return The HDFS builder, or NULL on error. + */ +LIBHDFS_EXTERNAL +struct hdfsBuilder *hdfsNewBuilder(void); + +/** + * Force the builder to always create a new instance of the FileSystem, + * rather than possibly finding one in the cache. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld); + +/** + * Set the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param nn The NameNode to use. + * + * If the string given is 'default', the default NameNode + * configuration will be used (from the XML configuration files) + * + * If NULL is given, a LocalFileSystem will be created. + * + * If the string starts with a protocol type such as file:// or + * hdfs://, this protocol type will be used. If not, the + * hdfs:// protocol type will be used. + * + * You may specify a NameNode port in the usual way by + * passing a string of the format hdfs://<hostname>:<port>. + * Alternately, you may set the port with + * hdfsBuilderSetNameNodePort. However, you must not pass the + * port in two different ways. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn); + +/** + * Set the port of the HDFS NameNode to connect to. + * + * @param bld The HDFS builder + * @param port The port. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port); + +/** + * Set the username to use when connecting to the HDFS cluster. + * + * @param bld The HDFS builder + * @param userName The user name. The string will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName); + +/** + * Set the path to the Kerberos ticket cache to use when connecting to + * the HDFS cluster. + * + * @param bld The HDFS builder + * @param kerbTicketCachePath The Kerberos ticket cache path. The string + * will be shallow-copied. + */ +LIBHDFS_EXTERNAL +void hdfsBuilderSetKerbTicketCachePath(struct hdfsBuilder *bld, + const char *kerbTicketCachePath); + +/** + * Free an HDFS builder. + * + * It is normally not necessary to call this function since + * hdfsBuilderConnect frees the builder. + * + * @param bld The HDFS builder + */ +LIBHDFS_EXTERNAL +void hdfsFreeBuilder(struct hdfsBuilder *bld); + +/** + * Set a configuration string for an HdfsBuilder. + * + * @param key The key to set. + * @param val The value, or NULL to set no value. + * This will be shallow-copied. You are responsible for + * ensuring that it remains valid until the builder is + * freed. + * + * @return 0 on success; nonzero error code otherwise. + */ +LIBHDFS_EXTERNAL +int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key, + const char *val); + +/** + * Get a configuration string. + * + * @param key The key to find + * @param val (out param) The value. This will be set to NULL if the + * key isn't found. You must free this string with + * hdfsConfStrFree. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetStr(const char *key, char **val); + +/** + * Get a configuration integer. + * + * @param key The key to find + * @param val (out param) The value. This will NOT be changed if the + * key isn't found. + * + * @return 0 on success; nonzero error code otherwise. + * Failure to find the key is not an error. + */ +LIBHDFS_EXTERNAL +int hdfsConfGetInt(const char *key, int32_t *val); + +/** + * Free a configuration string found with hdfsConfGetStr. + * + * @param val A configuration string obtained from hdfsConfGetStr + */ +LIBHDFS_EXTERNAL +void hdfsConfStrFree(char *val); + +/** + * hdfsDisconnect - Disconnect from the hdfs file system. + * Disconnect from hdfs. + * @param fs The configured filesystem handle. + * @return Returns 0 on success, -1 on error. + * Even if there is an error, the resources associated with the + * hdfsFS will be freed. + */ +LIBHDFS_EXTERNAL +int hdfsDisconnect(hdfsFS fs); + +/** + * hdfsOpenFile - Open a hdfs file in given mode. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param flags - an | of bits/fcntl.h file flags - supported flags are + * O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT), + * O_WRONLY|O_APPEND. Other flags are generally ignored other than (O_RDWR || + * (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP. + * @param bufferSize Size of buffer for read/write - pass 0 if you want + * to use the default configured values. + * @param replication Block replication - pass 0 if you want to use + * the default configured values. + * @param blocksize Size of block - pass 0 if you want to use the + * default configured values. + * @return Returns the handle to the open file or NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, + short replication, tSize blocksize); + +/** + * hdfsTruncateFile - Truncate a hdfs file to given lenght. + * @param fs The configured filesystem handle. + * @param path The full path to the file. + * @param newlength The size the file is to be truncated to + * @return 1 if the file has been truncated to the desired newlength + * and is immediately available to be reused for write operations + * such as append. + * 0 if a background process of adjusting the length of the last + * block has been started, and clients should wait for it to + * complete before proceeding with further file updates. + * -1 on error. + */ +int hdfsTruncateFile(hdfsFS fs, const char *path, tOffset newlength); + +/** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ +LIBHDFS_EXTERNAL +int hdfsUnbufferFile(hdfsFile file); + +/** + * hdfsCloseFile - Close an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + * On error, errno will be set appropriately. + * If the hdfs file was valid, the memory associated with it will + * be freed at the end of this call, even if there was an I/O + * error. + */ +LIBHDFS_EXTERNAL +int hdfsCloseFile(hdfsFS fs, hdfsFile file); + +/** + * hdfsExists - Checks if a given path exsits on the filesystem + * @param fs The configured filesystem handle. + * @param path The path to look for + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsExists(hdfsFS fs, const char *path); + +/** + * hdfsSeek - Seek to given offset in file. + * This works only for files opened in read-only mode. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param desiredPos Offset into the file to seek into. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + +/** + * hdfsTell - Get the current offset in the file, in bytes. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Current offset, -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsTell(hdfsFS fs, hdfsFile file); + +/** + * hdfsRead - Read data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return On success, a positive number indicating how many bytes + * were read. + * On end-of-file, 0. + * On error, -1. Errno will be set to the error code. + * Just like the POSIX read function, hdfsRead will return -1 + * and set errno to EINTR if data is temporarily unavailable, + * but we are not yet at the end of the file. + */ +LIBHDFS_EXTERNAL +tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length); + +/** + * hdfsPread - Positional read of data from an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param position Position from which to read + * @param buffer The buffer to copy read bytes into. + * @param length The length of the buffer. + * @return See hdfsRead + */ +LIBHDFS_EXTERNAL +tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, + tSize length); + +/** + * hdfsWrite - Write data into an open file. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @param buffer The data. + * @param length The no. of bytes to write. + * @return Returns the number of bytes written, -1 on error. + */ +LIBHDFS_EXTERNAL +tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void *buffer, tSize length); + +/** + * hdfsWrite - Flush the data. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHFlush - Flush out the data in client's user buffer. After the + * return of this call, new readers will see the data. + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHFlush(hdfsFS fs, hdfsFile file); + +/** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ +LIBHDFS_EXTERNAL +int hdfsHSync(hdfsFS fs, hdfsFile file); + +/** + * hdfsAvailable - Number of bytes that can be read from this + * input stream without blocking. + * @param fs The configured filesystem handle. + * @param file The file handle. + * @return Returns available bytes; -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsAvailable(hdfsFS fs, hdfsFile file); + +/** + * hdfsCopy - Copy file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCopy(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst); + +/** + * hdfsMove - Move file from one filesystem to another. + * @param srcFS The handle to source filesystem. + * @param src The path of source file. + * @param dstFS The handle to destination filesystem. + * @param dst The path of destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsMove(hdfsFS srcFS, const char *src, hdfsFS dstFS, const char *dst); + +/** + * hdfsDelete - Delete file. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param recursive if path is a directory and set to + * non-zero, the directory is deleted else throws an exception. In + * case of a file the recursive argument is irrelevant. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsDelete(hdfsFS fs, const char *path, int recursive); + +/** + * hdfsRename - Rename file. + * @param fs The configured filesystem handle. + * @param oldPath The path of the source file. + * @param newPath The path of the destination file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsRename(hdfsFS fs, const char *oldPath, const char *newPath); + +/** + * hdfsGetWorkingDirectory - Get the current working directory for + * the given filesystem. + * @param fs The configured filesystem handle. + * @param buffer The user-buffer to copy path of cwd into. + * @param bufferSize The length of user-buffer. + * @return Returns buffer, NULL on error. + */ +LIBHDFS_EXTERNAL +char *hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize); + +/** + * hdfsSetWorkingDirectory - Set the working directory. All relative + * paths will be resolved relative to it. + * @param fs The configured filesystem handle. + * @param path The path of the new 'cwd'. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetWorkingDirectory(hdfsFS fs, const char *path); + +/** + * hdfsCreateDirectory - Make the given file and all non-existent + * parents into directories. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsCreateDirectory(hdfsFS fs, const char *path); + +/** + * hdfsSetReplication - Set the replication of the specified + * file to the supplied value + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns 0 on success, -1 on error. + */ +LIBHDFS_EXTERNAL +int hdfsSetReplication(hdfsFS fs, const char *path, int16_t replication); + +/** + * hdfsFileInfo - Information about a file/directory. + */ +typedef struct { + tObjectKind mKind; /* file or directory */ + char *mName; /* the name of the file */ + tTime mLastMod; /* the last modification time for the file in seconds */ + tOffset mSize; /* the size of the file in bytes */ + short mReplication; /* the count of replicas */ + tOffset mBlockSize; /* the block size for the file */ + char *mOwner; /* the owner of the file */ + char *mGroup; /* the group associated with the file */ + short mPermissions; /* the permissions associated with the file */ + tTime mLastAccess; /* the last access time for the file in seconds */ +} hdfsFileInfo; + +/** + * hdfsListDirectory - Get list of files/directories for a given + * directory-path. hdfsFreeFileInfo should be called to deallocate memory. + * @param fs The configured filesystem handle. + * @param path The path of the directory. + * @param numEntries Set to the number of files/directories in path. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char *path, int *numEntries); + +/** + * hdfsGetPathInfo - Get information about a path as a (dynamically + * allocated) single hdfsFileInfo struct. hdfsFreeFileInfo should be + * called when the pointer is no longer needed. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @return Returns a dynamically-allocated hdfsFileInfo object; + * NULL on error. + */ +LIBHDFS_EXTERNAL +hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char *path); + +/** + * hdfsFreeFileInfo - Free up the hdfsFileInfo array (including fields) + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries); + +/** + * hdfsFileIsEncrypted: determine if a file is encrypted based on its + * hdfsFileInfo. + * @return -1 if there was an error (errno will be set), 0 if the file is + * not encrypted, 1 if the file is encrypted. + */ +LIBHDFS_EXTERNAL +int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo); + +/** + * hdfsGetHosts - Get hostnames where a particular block (determined by + * pos & blocksize) of a file is stored. The last element in the array + * is NULL. Due to replication, a single block could be present on + * multiple hosts. + * @param fs The configured filesystem handle. + * @param path The path of the file. + * @param start The start of the block. + * @param length The length of the block. + * @return Returns a dynamically-allocated 2-d array of blocks-hosts; + * NULL on error. + */ +LIBHDFS_EXTERNAL +char ***hdfsGetHosts(hdfsFS fs, const char *path, tOffset start, + tOffset length); + +/** + * hdfsFreeHosts - Free up the structure returned by hdfsGetHosts + * @param hdfsFileInfo The array of dynamically-allocated hdfsFileInfo + * objects. + * @param numEntries The size of the array. + */ +LIBHDFS_EXTERNAL +void hdfsFreeHosts(char ***blockHosts); + +/** + * hdfsGetDefaultBlockSize - Get the default blocksize. + * + * @param fs The configured filesystem handle. + * @deprecated Use hdfsGetDefaultBlockSizeAtPath instead. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSize(hdfsFS fs); + +/** + * hdfsGetDefaultBlockSizeAtPath - Get the default blocksize at the + * filesystem indicated by a given path. + * + * @param fs The configured filesystem handle. + * @param path The given path will be used to locate the actual + * filesystem. The full path does not have to exist. + * + * @return Returns the default blocksize, or -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path); + +/** + * hdfsGetCapacity - Return the raw capacity of the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the raw-capacity; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetCapacity(hdfsFS fs); + +/** + * hdfsGetUsed - Return the total raw size of all files in the filesystem. + * @param fs The configured filesystem handle. + * @return Returns the total-size; -1 on error. + */ +LIBHDFS_EXTERNAL +tOffset hdfsGetUsed(hdfsFS fs); + +/** + * Change the user and/or group of a file or directory. + * + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param owner User string. Set to NULL for 'no change' + * @param group Group string. Set to NULL for 'no change' + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChown(hdfsFS fs, const char *path, const char *owner, + const char *group); + +/** + * hdfsChmod + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mode the bitmask to set it to + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsChmod(hdfsFS fs, const char *path, short mode); + +/** + * hdfsUtime + * @param fs The configured filesystem handle. + * @param path the path to the file or directory + * @param mtime new modification time or -1 for no change + * @param atime new access time or -1 for no change + * @return 0 on success else -1 + */ +LIBHDFS_EXTERNAL +int hdfsUtime(hdfsFS fs, const char *path, tTime mtime, tTime atime); + +/** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ +LIBHDFS_EXTERNAL +struct hadoopRzOptions *hadoopRzOptionsAlloc(void); + +/** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetSkipChecksum(struct hadoopRzOptions *opts, int skip); + +/** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ +LIBHDFS_EXTERNAL +int hadoopRzOptionsSetByteBufferPool(struct hadoopRzOptions *opts, + const char *className); + +/** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ +LIBHDFS_EXTERNAL +void hadoopRzOptionsFree(struct hadoopRzOptions *opts); + +/** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, we will return a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * You can access the data within the hadoopRzBuffer with + * hadoopRzBufferGet. If you have reached EOF, the data + * within the hadoopRzBuffer will be NULL. You must still + * free hadoopRzBuffer instances containing NULL. + * + * On failure, we will return NULL plus an errno code. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ +LIBHDFS_EXTERNAL +struct hadoopRzBuffer *hadoopReadZero(hdfsFile file, + struct hadoopRzOptions *opts, + int32_t maxLength); + +/** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ +LIBHDFS_EXTERNAL +int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer); + +/** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ +LIBHDFS_EXTERNAL +const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer); + +/** + * Release a buffer obtained through readZero. + * + * @param file The hdfs stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ +LIBHDFS_EXTERNAL +void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer); + +#ifdef __cplusplus +} +#endif + +#undef LIBHDFS_EXTERNAL +#endif /*LIBHDFS_HDFS_H*/ + +/** + * vim: ts=4: sw=4: et + */ diff --git a/tensorflow/tools/ci_build/builds/configured b/tensorflow/tools/ci_build/builds/configured index a8842acdf40..e62a6ffe4d0 100755 --- a/tensorflow/tools/ci_build/builds/configured +++ b/tensorflow/tools/ci_build/builds/configured @@ -30,6 +30,8 @@ shift 1 # Enable support for Google Cloud Platform (GCP) export TF_NEED_GCP=1 +# Enable support for HDFS +export TF_NEED_HDFS=1 if [[ "$1" == "--disable-gcp" ]]; then export TF_NEED_GCP=0