STT-tensorflow/tensorflow/c/experimental/filesystem/modular_filesystem.cc
2020-12-15 08:43:22 +07:00

485 lines
19 KiB
C++

/* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
#include "tensorflow/c/experimental/filesystem/modular_filesystem.h"
#include <algorithm>
#include <string>
#include <utility>
#include "tensorflow/c/experimental/filesystem/modular_filesystem_registration.h"
#include "tensorflow/c/tf_status_helper.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/file_system_helper.h"
#include "tensorflow/core/util/ptr_util.h"
// TODO(mihaimaruseac): After all filesystems are converted, all calls to
// methods from `FileSystem` will have to be replaced to calls to private
// methods here, as part of making this class a singleton and the only way to
// register/use filesystems.
namespace tensorflow {
using UniquePtrTo_TF_Status =
::std::unique_ptr<TF_Status, decltype(&TF_DeleteStatus)>;
Status ModularFileSystem::NewRandomAccessFile(
const std::string& fname, TransactionToken* token,
std::unique_ptr<RandomAccessFile>* result) {
if (ops_->new_random_access_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewRandomAccessFile()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
auto file = MakeUnique<TF_RandomAccessFile>();
std::string translated_name = TranslateName(fname);
ops_->new_random_access_file(filesystem_.get(), translated_name.c_str(),
file.get(), plugin_status.get());
if (TF_GetCode(plugin_status.get()) == TF_OK)
*result = MakeUnique<ModularRandomAccessFile>(
translated_name, std::move(file), random_access_file_ops_.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::NewWritableFile(
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_writable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewWritableFile()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
auto file = MakeUnique<TF_WritableFile>();
std::string translated_name = TranslateName(fname);
ops_->new_writable_file(filesystem_.get(), translated_name.c_str(),
file.get(), plugin_status.get());
if (TF_GetCode(plugin_status.get()) == TF_OK)
*result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
writable_file_ops_.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::NewAppendableFile(
const std::string& fname, TransactionToken* token,
std::unique_ptr<WritableFile>* result) {
if (ops_->new_appendable_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support NewAppendableFile()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
auto file = MakeUnique<TF_WritableFile>();
std::string translated_name = TranslateName(fname);
ops_->new_appendable_file(filesystem_.get(), translated_name.c_str(),
file.get(), plugin_status.get());
if (TF_GetCode(plugin_status.get()) == TF_OK)
*result = MakeUnique<ModularWritableFile>(translated_name, std::move(file),
writable_file_ops_.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::NewReadOnlyMemoryRegionFromFile(
const std::string& fname, TransactionToken* token,
std::unique_ptr<ReadOnlyMemoryRegion>* result) {
if (ops_->new_read_only_memory_region_from_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname,
" does not support NewReadOnlyMemoryRegionFromFile()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
auto region = MakeUnique<TF_ReadOnlyMemoryRegion>();
std::string translated_name = TranslateName(fname);
ops_->new_read_only_memory_region_from_file(
filesystem_.get(), translated_name.c_str(), region.get(),
plugin_status.get());
if (TF_GetCode(plugin_status.get()) == TF_OK)
*result = MakeUnique<ModularReadOnlyMemoryRegion>(
std::move(region), read_only_memory_region_ops_.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::FileExists(const std::string& fname,
TransactionToken* token) {
if (ops_->path_exists == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support FileExists()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
const std::string translated_name = TranslateName(fname);
ops_->path_exists(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
bool ModularFileSystem::FilesExist(const std::vector<std::string>& files,
TransactionToken* token,
std::vector<Status>* status) {
if (ops_->paths_exist == nullptr)
return FileSystem::FilesExist(files, token, status);
std::vector<char*> translated_names;
translated_names.reserve(files.size());
for (int i = 0; i < files.size(); i++)
translated_names.push_back(strdup(TranslateName(files[i]).c_str()));
bool result;
if (status == nullptr) {
result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
files.size(), nullptr);
} else {
std::vector<TF_Status*> plugin_status;
plugin_status.reserve(files.size());
for (int i = 0; i < files.size(); i++)
plugin_status.push_back(TF_NewStatus());
result = ops_->paths_exist(filesystem_.get(), translated_names.data(),
files.size(), plugin_status.data());
for (int i = 0; i < files.size(); i++) {
status->push_back(StatusFromTF_Status(plugin_status[i]));
TF_DeleteStatus(plugin_status[i]);
}
}
for (int i = 0; i < files.size(); i++) free(translated_names[i]);
return result;
}
Status ModularFileSystem::GetChildren(const std::string& dir,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_children == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dir, " does not support GetChildren()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(dir);
// Note that `children` is allocated by the plugin and freed by core
// TensorFlow, so we need to use `plugin_memory_free_` here.
char** children = nullptr;
const int num_children =
ops_->get_children(filesystem_.get(), translated_name.c_str(), &children,
plugin_status.get());
if (num_children >= 0) {
for (int i = 0; i < num_children; i++) {
result->push_back(std::string(children[i]));
plugin_memory_free_(children[i]);
}
plugin_memory_free_(children);
}
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetMatchingPaths(const std::string& pattern,
TransactionToken* token,
std::vector<std::string>* result) {
if (ops_->get_matching_paths == nullptr)
return internal::GetMatchingPaths(this, Env::Default(), pattern, result);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
// Note that `matches` is allocated by the plugin and freed by core
// TensorFlow, so we need to use `plugin_memory_free_` here.
char** matches = nullptr;
const int num_matches = ops_->get_matching_paths(
filesystem_.get(), pattern.c_str(), &matches, plugin_status.get());
if (num_matches >= 0) {
for (int i = 0; i < num_matches; i++) {
result->push_back(std::string(matches[i]));
plugin_memory_free_(matches[i]);
}
plugin_memory_free_(matches);
}
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteFile(const std::string& fname,
TransactionToken* token) {
if (ops_->delete_file == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support DeleteFile()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(fname);
ops_->delete_file(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteRecursively(const std::string& dirname,
TransactionToken* token,
int64* undeleted_files,
int64* undeleted_dirs) {
if (undeleted_files == nullptr || undeleted_dirs == nullptr)
return errors::FailedPrecondition(
"DeleteRecursively must not be called with `undeleted_files` or "
"`undeleted_dirs` set to NULL");
if (ops_->delete_recursively == nullptr)
return FileSystem::DeleteRecursively(dirname, token, undeleted_files,
undeleted_dirs);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(dirname);
uint64_t plugin_undeleted_files, plugin_undeleted_dirs;
ops_->delete_recursively(filesystem_.get(), translated_name.c_str(),
&plugin_undeleted_files, &plugin_undeleted_dirs,
plugin_status.get());
*undeleted_files = plugin_undeleted_files;
*undeleted_dirs = plugin_undeleted_dirs;
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::DeleteDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->delete_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support DeleteDir()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(dirname);
ops_->delete_dir(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RecursivelyCreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->recursively_create_dir == nullptr)
return FileSystem::RecursivelyCreateDir(dirname, token);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(dirname);
ops_->recursively_create_dir(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CreateDir(const std::string& dirname,
TransactionToken* token) {
if (ops_->create_dir == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", dirname, " does not support CreateDir()"));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(dirname);
ops_->create_dir(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::Stat(const std::string& fname,
TransactionToken* token, FileStatistics* stat) {
if (ops_->stat == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Filesystem for ", fname, " does not support Stat()"));
if (stat == nullptr)
return errors::InvalidArgument("FileStatistics pointer must not be NULL");
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(fname);
TF_FileStatistics stats;
ops_->stat(filesystem_.get(), translated_name.c_str(), &stats,
plugin_status.get());
if (TF_GetCode(plugin_status.get()) == TF_OK) {
stat->length = stats.length;
stat->mtime_nsec = stats.mtime_nsec;
stat->is_directory = stats.is_directory;
}
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::IsDirectory(const std::string& name,
TransactionToken* token) {
if (ops_->is_directory == nullptr)
return FileSystem::IsDirectory(name, token);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(name);
ops_->is_directory(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::GetFileSize(const std::string& fname,
TransactionToken* token,
uint64* file_size) {
if (ops_->get_file_size == nullptr) {
FileStatistics stat;
Status status = Stat(fname, &stat);
if (!status.ok()) return status;
if (stat.is_directory)
return errors::FailedPrecondition("Called GetFileSize on a directory");
*file_size = stat.length;
return status;
}
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_name = TranslateName(fname);
*file_size = ops_->get_file_size(filesystem_.get(), translated_name.c_str(),
plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::RenameFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->rename_file == nullptr) {
Status status = CopyFile(src, target);
if (status.ok()) status = DeleteFile(src);
return status;
}
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_src = TranslateName(src);
std::string translated_target = TranslateName(target);
ops_->rename_file(filesystem_.get(), translated_src.c_str(),
translated_target.c_str(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularFileSystem::CopyFile(const std::string& src,
const std::string& target,
TransactionToken* token) {
if (ops_->copy_file == nullptr)
return FileSystem::CopyFile(src, target, token);
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
std::string translated_src = TranslateName(src);
std::string translated_target = TranslateName(target);
ops_->copy_file(filesystem_.get(), translated_src.c_str(),
translated_target.c_str(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
std::string ModularFileSystem::TranslateName(const std::string& name) const {
if (ops_->translate_name == nullptr) return FileSystem::TranslateName(name);
char* p = ops_->translate_name(filesystem_.get(), name.c_str());
CHECK(p != nullptr) << "TranslateName(" << name << ") returned nullptr";
std::string ret(p);
// Since `p` is allocated by plugin, free it using plugin's method.
plugin_memory_free_(p);
return ret;
}
void ModularFileSystem::FlushCaches(TransactionToken* token) {
if (ops_->flush_caches != nullptr) ops_->flush_caches(filesystem_.get());
}
Status ModularRandomAccessFile::Read(uint64 offset, size_t n,
StringPiece* result, char* scratch) const {
if (ops_->read == nullptr)
return errors::Unimplemented(
tensorflow::strings::StrCat("Read() not implemented for ", filename_));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
int64_t read =
ops_->read(file_.get(), offset, n, scratch, plugin_status.get());
if (read > 0) *result = StringPiece(scratch, read);
return StatusFromTF_Status(plugin_status.get());
}
Status ModularRandomAccessFile::Name(StringPiece* result) const {
*result = filename_;
return Status::OK();
}
Status ModularWritableFile::Append(StringPiece data) {
if (ops_->append == nullptr)
return errors::Unimplemented(tensorflow::strings::StrCat(
"Append() not implemented for ", filename_));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
ops_->append(file_.get(), data.data(), data.size(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularWritableFile::Close() {
if (ops_->close == nullptr)
return errors::Unimplemented(
tensorflow::strings::StrCat("Close() not implemented for ", filename_));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
ops_->close(file_.get(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularWritableFile::Flush() {
if (ops_->flush == nullptr) return Status::OK();
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
ops_->flush(file_.get(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularWritableFile::Sync() {
if (ops_->sync == nullptr) return Flush();
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
ops_->sync(file_.get(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status ModularWritableFile::Name(StringPiece* result) const {
*result = filename_;
return Status::OK();
}
Status ModularWritableFile::Tell(int64* position) {
if (ops_->tell == nullptr)
return errors::Unimplemented(
tensorflow::strings::StrCat("Tell() not implemented for ", filename_));
UniquePtrTo_TF_Status plugin_status(TF_NewStatus(), TF_DeleteStatus);
*position = ops_->tell(file_.get(), plugin_status.get());
return StatusFromTF_Status(plugin_status.get());
}
Status RegisterFilesystemPlugin(const std::string& dso_path) {
// Step 1: Load plugin
Env* env = Env::Default();
void* dso_handle;
TF_RETURN_IF_ERROR(env->LoadDynamicLibrary(dso_path.c_str(), &dso_handle));
// Step 2: Load symbol for `TF_InitPlugin`
void* dso_symbol;
TF_RETURN_IF_ERROR(
env->GetSymbolFromLibrary(dso_handle, "TF_InitPlugin", &dso_symbol));
// Step 3: Call `TF_InitPlugin`
TF_FilesystemPluginInfo info;
memset(&info, 0, sizeof(info));
auto TF_InitPlugin =
reinterpret_cast<int (*)(TF_FilesystemPluginInfo*)>(dso_symbol);
TF_InitPlugin(&info);
// Step 4: Do the actual registration
return filesystem_registration::RegisterFilesystemPluginImpl(&info);
}
} // namespace tensorflow