Merge pull request #41676 from samikama:Transactions_part8

PiperOrigin-RevId: 323944140
Change-Id: I33073187fbd6cfb01e551381cf076455a1a3f841
This commit is contained in:
TensorFlower Gardener 2020-07-29 23:21:51 -07:00
commit d365a03c9a

View File

@ -32,119 +32,178 @@ limitations under the License.
#include "tensorflow/python/lib/core/pybind11_absl.h"
#include "tensorflow/python/lib/core/pybind11_status.h"
namespace tensorflow {
struct PyTransactionToken {
TransactionToken* token_;
};
inline TransactionToken* TokenFromPyToken(PyTransactionToken* t) {
return (t ? t->token_ : nullptr);
}
} // namespace tensorflow
namespace {
namespace py = pybind11;
PYBIND11_MODULE(_pywrap_file_io, m) {
m.def("FileExists", [](const std::string& filename) {
tensorflow::Status status;
{
py::gil_scoped_release release;
status = tensorflow::Env::Default()->FileExists(filename);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("DeleteFile", [](const std::string& filename) {
py::gil_scoped_release release;
tensorflow::Status status =
tensorflow::Env::Default()->DeleteFile(filename);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("ReadFileToString", [](const std::string& filename) {
std::string data;
py::gil_scoped_release release;
const auto status =
ReadFileToString(tensorflow::Env::Default(), filename, &data);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return py::bytes(data);
});
m.def("WriteStringToFile",
[](const std::string& filename, tensorflow::StringPiece data) {
py::gil_scoped_release release;
const auto status =
WriteStringToFile(tensorflow::Env::Default(), filename, data);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("GetChildren", [](const std::string& dirname) {
std::vector<std::string> results;
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->GetChildren(dirname, &results);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return results;
});
m.def("GetMatchingFiles", [](const std::string& pattern) {
std::vector<std::string> results;
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return results;
});
m.def("CreateDir", [](const std::string& dirname) {
py::gil_scoped_release release;
const auto status = tensorflow::Env::Default()->CreateDir(dirname);
if (tensorflow::errors::IsAlreadyExists(status)) {
return;
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("RecursivelyCreateDir", [](const std::string& dirname) {
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("CopyFile",
[](const std::string& src, const std::string& target, bool overwrite) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
tensorflow::Status status;
if (!overwrite && env->FileExists(target).ok()) {
status = tensorflow::errors::AlreadyExists("file already exists");
} else {
status = env->CopyFile(src, target);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("RenameFile",
[](const std::string& src, const std::string& target, bool overwrite) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
tensorflow::Status status;
if (!overwrite && env->FileExists(target).ok()) {
status = tensorflow::errors::AlreadyExists("file already exists");
} else {
status = env->RenameFile(src, target);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("DeleteRecursively", [](const std::string& dirname) {
py::gil_scoped_release release;
tensorflow::int64 undeleted_files;
tensorflow::int64 undeleted_dirs;
auto status = tensorflow::Env::Default()->DeleteRecursively(
dirname, &undeleted_files, &undeleted_dirs);
if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
status =
tensorflow::errors::PermissionDenied("could not fully delete dir");
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
});
m.def("IsDirectory", [](const std::string& dirname) {
py::gil_scoped_release release;
const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
// FAILED_PRECONDITION response means path exists but isn't a dir.
if (tensorflow::errors::IsFailedPrecondition(status)) {
return false;
}
using tensorflow::PyTransactionToken;
using tensorflow::TransactionToken;
py::class_<PyTransactionToken>(m, "TransactionToken")
.def("__repr__", [](const PyTransactionToken* t) {
if (t->token_) {
return std::string(t->token_->owner->DecodeTransaction(t->token_));
}
return std::string("Invalid token!");
});
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
return true;
});
m.def(
"FileExists",
[](const std::string& filename, PyTransactionToken* token) {
tensorflow::Status status;
{
py::gil_scoped_release release;
status = tensorflow::Env::Default()->FileExists(filename);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"DeleteFile",
[](const std::string& filename, PyTransactionToken* token) {
py::gil_scoped_release release;
tensorflow::Status status =
tensorflow::Env::Default()->DeleteFile(filename);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"ReadFileToString",
[](const std::string& filename, PyTransactionToken* token) {
std::string data;
py::gil_scoped_release release;
const auto status =
ReadFileToString(tensorflow::Env::Default(), filename, &data);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return py::bytes(data);
},
py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"WriteStringToFile",
[](const std::string& filename, tensorflow::StringPiece data,
PyTransactionToken* token) {
py::gil_scoped_release release;
const auto status =
WriteStringToFile(tensorflow::Env::Default(), filename, data);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("filename"), py::arg("data"),
py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"GetChildren",
[](const std::string& dirname, PyTransactionToken* token) {
std::vector<std::string> results;
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->GetChildren(dirname, &results);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return results;
},
py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"GetMatchingFiles",
[](const std::string& pattern, PyTransactionToken* token) {
std::vector<std::string> results;
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
pybind11::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return results;
},
py::arg("pattern"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"CreateDir",
[](const std::string& dirname, PyTransactionToken* token) {
py::gil_scoped_release release;
const auto status = tensorflow::Env::Default()->CreateDir(dirname);
if (tensorflow::errors::IsAlreadyExists(status)) {
return;
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"RecursivelyCreateDir",
[](const std::string& dirname, PyTransactionToken* token) {
py::gil_scoped_release release;
const auto status =
tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"CopyFile",
[](const std::string& src, const std::string& target, bool overwrite,
PyTransactionToken* token) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
tensorflow::Status status;
if (!overwrite && env->FileExists(target).ok()) {
status = tensorflow::errors::AlreadyExists("file already exists");
} else {
status = env->CopyFile(src, target);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("src"), py::arg("target"), py::arg("overwrite"),
py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"RenameFile",
[](const std::string& src, const std::string& target, bool overwrite,
PyTransactionToken* token) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
tensorflow::Status status;
if (!overwrite && env->FileExists(target).ok()) {
status = tensorflow::errors::AlreadyExists("file already exists");
} else {
status = env->RenameFile(src, target);
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("src"), py::arg("target"), py::arg("overwrite"),
py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"DeleteRecursively",
[](const std::string& dirname, PyTransactionToken* token) {
py::gil_scoped_release release;
tensorflow::int64 undeleted_files;
tensorflow::int64 undeleted_dirs;
auto status = tensorflow::Env::Default()->DeleteRecursively(
dirname, &undeleted_files, &undeleted_dirs);
if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
status = tensorflow::errors::PermissionDenied(
"could not fully delete dir");
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
},
py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def(
"IsDirectory",
[](const std::string& dirname, PyTransactionToken* token) {
py::gil_scoped_release release;
const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
// FAILED_PRECONDITION response means path exists but isn't a dir.
if (tensorflow::errors::IsFailedPrecondition(status)) {
return false;
}
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
return true;
},
py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
m.def("HasAtomicMove", [](const std::string& path) {
py::gil_scoped_release release;
bool has_atomic_move;
@ -159,29 +218,36 @@ PYBIND11_MODULE(_pywrap_file_io, m) {
.def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)
.def_readonly("is_directory", &tensorflow::FileStatistics::is_directory);
m.def("Stat", [](const std::string& filename) {
py::gil_scoped_release release;
std::unique_ptr<tensorflow::FileStatistics> self(
new tensorflow::FileStatistics);
const auto status = tensorflow::Env::Default()->Stat(filename, self.get());
py::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return self.release();
});
using tensorflow::WritableFile;
py::class_<WritableFile>(m, "WritableFile")
.def(py::init([](const std::string& filename, const std::string& mode) {
m.def(
"Stat",
[](const std::string& filename, PyTransactionToken* token) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
std::unique_ptr<WritableFile> self;
const auto status = mode.find("a") == std::string::npos
? env->NewWritableFile(filename, &self)
: env->NewAppendableFile(filename, &self);
std::unique_ptr<tensorflow::FileStatistics> self(
new tensorflow::FileStatistics);
const auto status =
tensorflow::Env::Default()->Stat(filename, self.get());
py::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return self.release();
}))
},
py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
using tensorflow::WritableFile;
py::class_<WritableFile>(m, "WritableFile")
.def(py::init([](const std::string& filename, const std::string& mode,
PyTransactionToken* token) {
py::gil_scoped_release release;
auto* env = tensorflow::Env::Default();
std::unique_ptr<WritableFile> self;
const auto status = mode.find("a") == std::string::npos
? env->NewWritableFile(filename, &self)
: env->NewAppendableFile(filename, &self);
py::gil_scoped_acquire acquire;
tensorflow::MaybeRaiseRegisteredFromStatus(status);
return self.release();
}),
py::arg("filename"), py::arg("mode"),
py::arg("token") = (PyTransactionToken*)nullptr)
.def("append",
[](WritableFile* self, tensorflow::StringPiece data) {
const auto status = self->Append(data);
@ -209,19 +275,24 @@ PYBIND11_MODULE(_pywrap_file_io, m) {
using tensorflow::io::BufferedInputStream;
py::class_<BufferedInputStream>(m, "BufferedInputStream")
.def(py::init([](const std::string& filename, size_t buffer_size) {
py::gil_scoped_release release;
std::unique_ptr<tensorflow::RandomAccessFile> file;
const auto status =
tensorflow::Env::Default()->NewRandomAccessFile(filename, &file);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
std::unique_ptr<tensorflow::io::RandomAccessInputStream> input_stream(
new tensorflow::io::RandomAccessInputStream(file.release(),
/*owns_file=*/true));
py::gil_scoped_acquire acquire;
return new BufferedInputStream(input_stream.release(), buffer_size,
/*owns_input_stream=*/true);
}))
.def(py::init([](const std::string& filename, size_t buffer_size,
PyTransactionToken* token) {
py::gil_scoped_release release;
std::unique_ptr<tensorflow::RandomAccessFile> file;
const auto status =
tensorflow::Env::Default()->NewRandomAccessFile(filename,
&file);
tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
std::unique_ptr<tensorflow::io::RandomAccessInputStream>
input_stream(new tensorflow::io::RandomAccessInputStream(
file.release(),
/*owns_file=*/true));
py::gil_scoped_acquire acquire;
return new BufferedInputStream(input_stream.release(), buffer_size,
/*owns_input_stream=*/true);
}),
py::arg("filename"), py::arg("buffer_size"),
py::arg("token") = (PyTransactionToken*)nullptr)
.def("read",
[](BufferedInputStream* self, tensorflow::int64 bytes_to_read) {
py::gil_scoped_release release;