Add TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT macro to modified classes
This commit is contained in:
parent
18e4ffd5be
commit
8e75f3b993
@ -59,6 +59,8 @@ class ModularFileSystem final : public FileSystem {
|
|||||||
|
|
||||||
~ModularFileSystem() override { ops_->cleanup(filesystem_.get()); }
|
~ModularFileSystem() override { ops_->cleanup(filesystem_.get()); }
|
||||||
|
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const std::string& fname, TransactionToken* token,
|
const std::string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override;
|
std::unique_ptr<RandomAccessFile>* result) override;
|
||||||
|
|||||||
@ -19,9 +19,6 @@ limitations under the License.
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "tensorflow/cc/ops/nn_ops.h"
|
#include "tensorflow/cc/ops/nn_ops.h"
|
||||||
#include "tensorflow/core/common_runtime/constant_folding.h"
|
|
||||||
|
|
||||||
#include "tensorflow/cc/ops/array_ops_internal.h"
|
|
||||||
#include "tensorflow/cc/ops/sendrecv_ops.h"
|
#include "tensorflow/cc/ops/sendrecv_ops.h"
|
||||||
#include "tensorflow/cc/ops/standard_ops.h"
|
#include "tensorflow/cc/ops/standard_ops.h"
|
||||||
#include "tensorflow/core/common_runtime/device.h"
|
#include "tensorflow/core/common_runtime/device.h"
|
||||||
@ -687,6 +684,8 @@ class TestTFFileSystem : public ::tensorflow::NullFileSystem {
|
|||||||
: ::tensorflow::NullFileSystem(),
|
: ::tensorflow::NullFileSystem(),
|
||||||
data_tensor_(test::AsTensor<double>({1., 2., 3., 4.}, {2, 2})) {}
|
data_tensor_(test::AsTensor<double>({1., 2., 3., 4.}, {2, 2})) {}
|
||||||
|
|
||||||
|
using ::tensorflow::NullFileSystem::NewReadOnlyMemoryRegionFromFile;
|
||||||
|
|
||||||
::tensorflow::Status NewReadOnlyMemoryRegionFromFile(
|
::tensorflow::Status NewReadOnlyMemoryRegionFromFile(
|
||||||
const string& fname, ::tensorflow::TransactionToken* token,
|
const string& fname, ::tensorflow::TransactionToken* token,
|
||||||
std::unique_ptr<::tensorflow::ReadOnlyMemoryRegion>* result) override {
|
std::unique_ptr<::tensorflow::ReadOnlyMemoryRegion>* result) override {
|
||||||
|
|||||||
@ -60,6 +60,10 @@ class TestReadOnlyMemoryRegion : public ReadOnlyMemoryRegion {
|
|||||||
class TestFileSystem : public NullFileSystem {
|
class TestFileSystem : public NullFileSystem {
|
||||||
public:
|
public:
|
||||||
~TestFileSystem() override = default;
|
~TestFileSystem() override = default;
|
||||||
|
|
||||||
|
// import non-transactional method from the base class
|
||||||
|
using NullFileSystem::NewReadOnlyMemoryRegionFromFile;
|
||||||
|
|
||||||
Status NewReadOnlyMemoryRegionFromFile(
|
Status NewReadOnlyMemoryRegionFromFile(
|
||||||
const string& fname, TransactionToken* token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
|
std::unique_ptr<ReadOnlyMemoryRegion>* result) override {
|
||||||
|
|||||||
@ -125,6 +125,8 @@ class GcsFileSystem : public FileSystem {
|
|||||||
std::pair<const string, const string>* additional_header,
|
std::pair<const string, const string>* additional_header,
|
||||||
bool compose_append);
|
bool compose_append);
|
||||||
|
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& fname, TransactionToken* token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override;
|
std::unique_ptr<RandomAccessFile>* result) override;
|
||||||
|
|||||||
@ -295,7 +295,9 @@ TEST_F(DefaultEnvTest, SleepForMicroseconds) {
|
|||||||
|
|
||||||
class TmpDirFileSystem : public NullFileSystem {
|
class TmpDirFileSystem : public NullFileSystem {
|
||||||
public:
|
public:
|
||||||
Status FileExists(const string& dir,TransactionToken* token) override {
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
|
Status FileExists(const string& dir, TransactionToken* token) override {
|
||||||
StringPiece scheme, host, path;
|
StringPiece scheme, host, path;
|
||||||
io::ParseURI(dir, &scheme, &host, &path);
|
io::ParseURI(dir, &scheme, &host, &path);
|
||||||
if (path.empty()) return errors::NotFound(dir, " not found");
|
if (path.empty()) return errors::NotFound(dir, " not found");
|
||||||
@ -311,7 +313,7 @@ class TmpDirFileSystem : public NullFileSystem {
|
|||||||
return Env::Default()->FileExists(io::JoinPath(BaseDir(), path));
|
return Env::Default()->FileExists(io::JoinPath(BaseDir(), path));
|
||||||
}
|
}
|
||||||
|
|
||||||
Status CreateDir(const string& dir,TransactionToken* token) override {
|
Status CreateDir(const string& dir, TransactionToken* token) override {
|
||||||
StringPiece scheme, host, path;
|
StringPiece scheme, host, path;
|
||||||
io::ParseURI(dir, &scheme, &host, &path);
|
io::ParseURI(dir, &scheme, &host, &path);
|
||||||
if (scheme != "tmpdirfs") {
|
if (scheme != "tmpdirfs") {
|
||||||
@ -328,7 +330,7 @@ class TmpDirFileSystem : public NullFileSystem {
|
|||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status IsDirectory(const string& dir,TransactionToken* token) override {
|
Status IsDirectory(const string& dir, TransactionToken* token) override {
|
||||||
StringPiece scheme, host, path;
|
StringPiece scheme, host, path;
|
||||||
io::ParseURI(dir, &scheme, &host, &path);
|
io::ParseURI(dir, &scheme, &host, &path);
|
||||||
for (const auto& existing_dir : created_directories_)
|
for (const auto& existing_dir : created_directories_)
|
||||||
|
|||||||
@ -518,6 +518,30 @@ class FileSystem {
|
|||||||
|
|
||||||
virtual ~FileSystem() = default;
|
virtual ~FileSystem() = default;
|
||||||
};
|
};
|
||||||
|
/// This macro adds forwarding methods from FileSystem class to
|
||||||
|
/// used class since name hiding will prevent these to be accessed from
|
||||||
|
/// derived classes and would require all use locations to migrate to
|
||||||
|
/// Transactional API. This is an interim solution until ModularFileSystem class
|
||||||
|
/// becomes a singleton.
|
||||||
|
// TODO(sami): Remove this macro when filesystem plugins migration is complete.
|
||||||
|
#define TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT \
|
||||||
|
using FileSystem::NewRandomAccessFile; \
|
||||||
|
using FileSystem::NewWritableFile; \
|
||||||
|
using FileSystem::NewAppendableFile; \
|
||||||
|
using FileSystem::NewReadOnlyMemoryRegionFromFile; \
|
||||||
|
using FileSystem::FileExists; \
|
||||||
|
using FileSystem::GetChildren; \
|
||||||
|
using FileSystem::GetMatchingPaths; \
|
||||||
|
using FileSystem::Stat; \
|
||||||
|
using FileSystem::DeleteFile; \
|
||||||
|
using FileSystem::RecursivelyCreateDir; \
|
||||||
|
using FileSystem::DeleteDir; \
|
||||||
|
using FileSystem::DeleteRecursively; \
|
||||||
|
using FileSystem::GetFileSize; \
|
||||||
|
using FileSystem::RenameFile; \
|
||||||
|
using FileSystem::CopyFile; \
|
||||||
|
using FileSystem::IsDirectory; \
|
||||||
|
using FileSystem::FlushCaches
|
||||||
|
|
||||||
/// A Wrapper class for Transactional FileSystem support.
|
/// A Wrapper class for Transactional FileSystem support.
|
||||||
/// This provides means to make use of the transactions with minimal code change
|
/// This provides means to make use of the transactions with minimal code change
|
||||||
@ -529,6 +553,8 @@ class FileSystem {
|
|||||||
/// transactional filesystem access with minimal code change.
|
/// transactional filesystem access with minimal code change.
|
||||||
class WrappedFileSystem : public FileSystem {
|
class WrappedFileSystem : public FileSystem {
|
||||||
public:
|
public:
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
tensorflow::Status NewRandomAccessFile(
|
tensorflow::Status NewRandomAccessFile(
|
||||||
const std::string& fname, TransactionToken* token,
|
const std::string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override {
|
std::unique_ptr<RandomAccessFile>* result) override {
|
||||||
@ -691,31 +717,6 @@ class WrappedFileSystem : public FileSystem {
|
|||||||
TransactionToken* token_;
|
TransactionToken* token_;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// This macro adds forwarding methods from FileSystem class to
|
|
||||||
/// used class since name hiding will prevent these to be accessed from
|
|
||||||
/// derived classes and would require all use locations to migrate to
|
|
||||||
/// Transactional API. This is an interim solution until ModularFileSystem class
|
|
||||||
/// becomes a singleton.
|
|
||||||
// TODO(sami): Remove this macro when filesystem plugins migration is complete.
|
|
||||||
#define TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT \
|
|
||||||
using FileSystem::NewRandomAccessFile; \
|
|
||||||
using FileSystem::NewWritableFile; \
|
|
||||||
using FileSystem::NewAppendableFile; \
|
|
||||||
using FileSystem::NewReadOnlyMemoryRegionFromFile; \
|
|
||||||
using FileSystem::FileExists; \
|
|
||||||
using FileSystem::GetChildren; \
|
|
||||||
using FileSystem::GetMatchingPaths; \
|
|
||||||
using FileSystem::Stat; \
|
|
||||||
using FileSystem::DeleteFile; \
|
|
||||||
using FileSystem::RecursivelyCreateDir; \
|
|
||||||
using FileSystem::DeleteDir; \
|
|
||||||
using FileSystem::DeleteRecursively; \
|
|
||||||
using FileSystem::GetFileSize; \
|
|
||||||
using FileSystem::RenameFile; \
|
|
||||||
using FileSystem::CopyFile; \
|
|
||||||
using FileSystem::IsDirectory; \
|
|
||||||
using FileSystem::FlushCaches
|
|
||||||
|
|
||||||
/// A file abstraction for randomly reading the contents of a file.
|
/// A file abstraction for randomly reading the contents of a file.
|
||||||
class RandomAccessFile {
|
class RandomAccessFile {
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -32,6 +32,8 @@ static const char* const kPrefix = "ipfs://solarsystem";
|
|||||||
// cannot have children further.
|
// cannot have children further.
|
||||||
class InterPlanetaryFileSystem : public NullFileSystem {
|
class InterPlanetaryFileSystem : public NullFileSystem {
|
||||||
public:
|
public:
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status FileExists(const string& fname, TransactionToken* token) override {
|
Status FileExists(const string& fname, TransactionToken* token) override {
|
||||||
string parsed_path;
|
string parsed_path;
|
||||||
ParsePath(fname, &parsed_path);
|
ParsePath(fname, &parsed_path);
|
||||||
|
|||||||
@ -36,6 +36,8 @@ class NullFileSystem : public FileSystem {
|
|||||||
|
|
||||||
~NullFileSystem() override = default;
|
~NullFileSystem() override = default;
|
||||||
|
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& fname, TransactionToken* token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override {
|
std::unique_ptr<RandomAccessFile>* result) override {
|
||||||
|
|||||||
@ -103,6 +103,8 @@ class RamRandomAccessFile : public RandomAccessFile, public WritableFile {
|
|||||||
|
|
||||||
class RamFileSystem : public FileSystem {
|
class RamFileSystem : public FileSystem {
|
||||||
public:
|
public:
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& fname, TransactionToken* token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override {
|
std::unique_ptr<RandomAccessFile>* result) override {
|
||||||
|
|||||||
@ -38,6 +38,8 @@ class RetryingFileSystem : public FileSystem {
|
|||||||
: base_file_system_(std::move(base_file_system)),
|
: base_file_system_(std::move(base_file_system)),
|
||||||
retry_config_(retry_config) {}
|
retry_config_(retry_config) {}
|
||||||
|
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& filename, TransactionToken* token,
|
const string& filename, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override;
|
std::unique_ptr<RandomAccessFile>* result) override;
|
||||||
|
|||||||
@ -99,6 +99,8 @@ class MockFileSystem : public FileSystem {
|
|||||||
explicit MockFileSystem(const ExpectedCalls& calls, bool* flushed = nullptr)
|
explicit MockFileSystem(const ExpectedCalls& calls, bool* flushed = nullptr)
|
||||||
: calls_(calls), flushed_(flushed) {}
|
: calls_(calls), flushed_(flushed) {}
|
||||||
|
|
||||||
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& fname, TransactionToken* token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<RandomAccessFile>* result) override {
|
std::unique_ptr<RandomAccessFile>* result) override {
|
||||||
|
|||||||
@ -811,9 +811,9 @@ Status S3FileSystem::CreateDir(const string& dirname, TransactionToken* token) {
|
|||||||
if (filename.back() != '/') {
|
if (filename.back() != '/') {
|
||||||
filename.push_back('/');
|
filename.push_back('/');
|
||||||
}
|
}
|
||||||
if (!this->FileExists(filename,token).ok()) {
|
if (!this->FileExists(filename, token).ok()) {
|
||||||
std::unique_ptr<WritableFile> file;
|
std::unique_ptr<WritableFile> file;
|
||||||
TF_RETURN_IF_ERROR(NewWritableFile(filename,token, &file));
|
TF_RETURN_IF_ERROR(NewWritableFile(filename, token, &file));
|
||||||
TF_RETURN_IF_ERROR(file->Close());
|
TF_RETURN_IF_ERROR(file->Close());
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
@ -850,7 +850,7 @@ Status S3FileSystem::DeleteDir(const string& dirname, TransactionToken* token) {
|
|||||||
if (filename.back() != '/') {
|
if (filename.back() != '/') {
|
||||||
filename.push_back('/');
|
filename.push_back('/');
|
||||||
}
|
}
|
||||||
return DeleteFile(filename,token);
|
return DeleteFile(filename, token);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TF_RETURN_IF_ERROR(CheckForbiddenError(listObjectsOutcome.GetError()));
|
TF_RETURN_IF_ERROR(CheckForbiddenError(listObjectsOutcome.GetError()));
|
||||||
@ -911,8 +911,8 @@ Status S3FileSystem::CopyFile(const Aws::String& source_bucket,
|
|||||||
Aws::String source = Aws::String((source_bucket + "/" + source_key).c_str());
|
Aws::String source = Aws::String((source_bucket + "/" + source_key).c_str());
|
||||||
Aws::String source_full_path = Aws::String("s3://") + source;
|
Aws::String source_full_path = Aws::String("s3://") + source;
|
||||||
uint64 file_length;
|
uint64 file_length;
|
||||||
TF_RETURN_IF_ERROR(
|
TF_RETURN_IF_ERROR(this->GetFileSize(string(source_full_path.c_str()),
|
||||||
this->GetFileSize(string(source_full_path.c_str()), nullptr, &file_length));
|
nullptr, &file_length));
|
||||||
int num_parts;
|
int num_parts;
|
||||||
if (file_length <=
|
if (file_length <=
|
||||||
multi_part_chunk_size_[Aws::Transfer::TransferDirection::UPLOAD]) {
|
multi_part_chunk_size_[Aws::Transfer::TransferDirection::UPLOAD]) {
|
||||||
|
|||||||
@ -49,67 +49,50 @@ class S3FileSystem : public FileSystem {
|
|||||||
S3FileSystem();
|
S3FileSystem();
|
||||||
~S3FileSystem();
|
~S3FileSystem();
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
|
||||||
const string& fname, TransactionToken * token,
|
|
||||||
std::unique_ptr<RandomAccessFile>*
|
|
||||||
result ) override;
|
|
||||||
|
|
||||||
Status NewRandomAccessFile(
|
Status NewRandomAccessFile(
|
||||||
const string& fname, TransactionToken * token,std::unique_ptr<RandomAccessFile>* result,
|
const string& fname, TransactionToken* token,
|
||||||
bool use_multi_part_download );
|
std::unique_ptr<RandomAccessFile>* result) override;
|
||||||
|
|
||||||
Status NewWritableFile(
|
Status NewRandomAccessFile(const string& fname, TransactionToken* token,
|
||||||
const string& fname,TransactionToken * token,
|
std::unique_ptr<RandomAccessFile>* result,
|
||||||
std::unique_ptr<WritableFile>*
|
bool use_multi_part_download);
|
||||||
result ) override;
|
|
||||||
|
|
||||||
Status NewAppendableFile(
|
Status NewWritableFile(const string& fname, TransactionToken* token,
|
||||||
const string& fname,TransactionToken * token,
|
std::unique_ptr<WritableFile>* result) override;
|
||||||
std::unique_ptr<WritableFile>*
|
|
||||||
result ) override;
|
Status NewAppendableFile(const string& fname, TransactionToken* token,
|
||||||
|
std::unique_ptr<WritableFile>* result) override;
|
||||||
|
|
||||||
Status NewReadOnlyMemoryRegionFromFile(
|
Status NewReadOnlyMemoryRegionFromFile(
|
||||||
const string& fname,TransactionToken * token,
|
const string& fname, TransactionToken* token,
|
||||||
std::unique_ptr<ReadOnlyMemoryRegion>*
|
std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
|
||||||
result ) override;
|
|
||||||
|
|
||||||
Status FileExists(
|
Status FileExists(const string& fname, TransactionToken* token) override;
|
||||||
const string& fname,TransactionToken * token ) override;
|
|
||||||
|
|
||||||
Status GetChildren(
|
Status GetChildren(const string& dir, TransactionToken* token,
|
||||||
const string& dir,TransactionToken * token,
|
std::vector<string>* result) override;
|
||||||
std::vector<string>* result )
|
|
||||||
override;
|
|
||||||
|
|
||||||
Status Stat(
|
Status Stat(const string& fname, TransactionToken* token,
|
||||||
const string& fname,TransactionToken * token,
|
FileStatistics* stat) override;
|
||||||
FileStatistics* stat ) override;
|
|
||||||
|
|
||||||
Status GetMatchingPaths(
|
Status GetMatchingPaths(const string& pattern, TransactionToken* token,
|
||||||
const string& pattern,TransactionToken * token,
|
std::vector<string>* results) override;
|
||||||
std::vector<string>* results )
|
|
||||||
override;
|
|
||||||
|
|
||||||
Status DeleteFile(
|
Status DeleteFile(const string& fname, TransactionToken* token) override;
|
||||||
const string& fname,TransactionToken * token ) override;
|
|
||||||
|
|
||||||
Status CreateDir(
|
Status CreateDir(const string& name, TransactionToken* token) override;
|
||||||
const string& name, TransactionToken * token) override;
|
|
||||||
|
|
||||||
Status DeleteDir(
|
Status DeleteDir(const string& name, TransactionToken* token) override;
|
||||||
const string& name,TransactionToken * token ) override;
|
|
||||||
|
|
||||||
Status GetFileSize(
|
Status GetFileSize(const string& fname, TransactionToken* token,
|
||||||
const string& fname,TransactionToken * token,
|
uint64* size) override;
|
||||||
uint64* size ) override;
|
|
||||||
|
|
||||||
Status RenameFile(
|
Status RenameFile(const string& src, const string& target,
|
||||||
const string& src,
|
TransactionToken* token) override;
|
||||||
const string& target,TransactionToken * token ) override;
|
|
||||||
|
|
||||||
Status HasAtomicMove(
|
Status HasAtomicMove(const string& path, bool* has_atomic_move) override;
|
||||||
const string& path,
|
|
||||||
bool* has_atomic_move ) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Returns the member S3 client, initializing as-needed.
|
// Returns the member S3 client, initializing as-needed.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user