diff --git a/tensorflow/core/platform/file_system_helper.cc b/tensorflow/core/platform/file_system_helper.cc index 4f8bf2e1a52..f8ce2a9226e 100644 --- a/tensorflow/core/platform/file_system_helper.cc +++ b/tensorflow/core/platform/file_system_helper.cc @@ -19,8 +19,10 @@ limitations under the License. #include #include +#include "tensorflow/core/platform/cpu_info.h" #include "tensorflow/core/platform/env.h" #include "tensorflow/core/platform/file_system.h" +#include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/platform/path.h" #include "tensorflow/core/platform/platform.h" #include "tensorflow/core/platform/status.h" @@ -32,7 +34,7 @@ namespace internal { namespace { -constexpr int kNumThreads = 8; +const int kNumThreads = port::NumSchedulableCPUs(); // Run a function in parallel using a ThreadPool, but skip the ThreadPool // on the iOS platform due to its problems with more than a few threads. @@ -50,94 +52,217 @@ void ForEach(int first, int last, const std::function& f) { #endif } +// A globbing pattern can only start with these characters: +static const char kGlobbingChars[] = "*?[\\"; + +static inline bool IsGlobbingPattern(const std::string& pattern) { + return (pattern.find_first_of(kGlobbingChars) != std::string::npos); +} + +// Make sure that the first entry in `dirs` during glob expansion does not +// contain a glob pattern. This is to prevent a corner-case bug where +// `` would be treated differently than `./`. +static std::string PatchPattern(const std::string& pattern) { + const std::string fixed_prefix = + pattern.substr(0, pattern.find_first_of(kGlobbingChars)); + + // Patching is needed when there is no directory part in `prefix` + if (io::Dirname(fixed_prefix).empty()) { + return io::JoinPath(".", pattern); + } + + // No patching needed + return pattern; +} + +static std::vector AllDirectoryPrefixes(const std::string& d) { + std::vector dirs; + const std::string patched = PatchPattern(d); + StringPiece dir(patched); + + // If the pattern ends with a `/` (or `\\` on Windows), we need to strip it + // otherwise we would have one additional matching step and the result set + // would be empty. + bool is_directory = d[d.size() - 1] == '/'; +#ifdef PLATFORM_WINDOWS + is_directory = is_directory || (d[d.size() - 1] == '\\'); +#endif + if (is_directory) { + dir = io::Dirname(dir); + } + + while (!dir.empty()) { + dirs.emplace_back(dir); + StringPiece new_dir(io::Dirname(dir)); + // io::Dirname("/") returns "/" so we need to break the loop. + // On Windows, io::Dirname("C:\\") would return "C:\\", so we check for + // identity of the result instead of checking for dir[0] == `/`. + if (dir == new_dir) break; + dir = new_dir; + } + + // Order the array from parent to ancestor (reverse order). + std::reverse(dirs.begin(), dirs.end()); + + return dirs; +} + +static inline int GetFirstGlobbingEntry(const std::vector& dirs) { + int i = 0; + for (const auto& d : dirs) { + if (IsGlobbingPattern(d)) { + break; + } + i++; + } + return i; +} + } // namespace Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern, std::vector* results) { + // Check that `fs`, `env` and `results` are non-null. + if (fs == nullptr || env == nullptr || results == nullptr) { + return Status(tensorflow::error::INVALID_ARGUMENT, + "Filesystem calls GetMatchingPaths with nullptr arguments"); + } + + // By design, we don't match anything on empty pattern results->clear(); if (pattern.empty()) { return Status::OK(); } - string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); - string eval_pattern = pattern; - string dir(io::Dirname(fixed_prefix)); - // If dir is empty then we need to fix up fixed_prefix and eval_pattern to - // include . as the top level directory. - if (dir.empty()) { - dir = "."; - fixed_prefix = io::JoinPath(dir, fixed_prefix); - eval_pattern = io::JoinPath(dir, eval_pattern); - } - bool is_directory = pattern[pattern.size() - 1] == '/'; -#ifdef PLATFORM_WINDOWS - is_directory = is_directory || pattern[pattern.size() - 1] == '\\'; -#endif - std::vector dirs; - if (!is_directory) { - dirs.push_back(eval_pattern); - } - StringPiece tmp_dir(io::Dirname(eval_pattern)); - while (tmp_dir.size() > dir.size()) { - dirs.push_back(string(tmp_dir)); - tmp_dir = io::Dirname(tmp_dir); - } - dirs.push_back(dir); - std::reverse(dirs.begin(), dirs.end()); - // Setup a BFS to explore everything under dir. - std::deque> dir_q; - dir_q.push_back({dirs[0], 0}); - Status ret; // Status to return. - // children_dir_status holds is_dir status for children. It can have three - // possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED - // if we don't calculate IsDirectory (we might do that because there isn't - // any point in exploring that child path). - std::vector children_dir_status; - while (!dir_q.empty()) { - string current_dir = dir_q.front().first; - int dir_index = dir_q.front().second; - dir_index++; - dir_q.pop_front(); - std::vector children; - Status s = fs->GetChildren(current_dir, &children); - // In case PERMISSION_DENIED is encountered, we bail here. - if (s.code() == tensorflow::error::PERMISSION_DENIED) { - continue; + // The pattern can contain globbing characters at multiple levels, e.g.: + // + // foo/ba?/baz/f*r + // + // To match the full pattern, we must match every prefix subpattern and then + // operate on the children for each match. Thus, we separate all subpatterns + // in the `dirs` vector below. + std::vector dirs = AllDirectoryPrefixes(pattern); + + // We can have patterns that have several parents where no globbing is being + // done, for example, `foo/bar/baz/*`. We don't need to expand the directories + // which don't contain the globbing characters. + int matching_index = GetFirstGlobbingEntry(dirs); + + // If we don't have globbing characters in the pattern then it specifies a + // path in the filesystem. We add it to the result set if it exists. + if (matching_index == dirs.size()) { + if (fs->FileExists(pattern).ok()) { + results->emplace_back(pattern); } - ret.Update(s); - if (children.empty()) continue; - // This IsDirectory call can be expensive for some FS. Parallelizing it. - children_dir_status.resize(children.size()); - ForEach(0, children.size(), - [fs, ¤t_dir, &children, &dirs, dir_index, is_directory, - &children_dir_status](int i) { - const string child_path = io::JoinPath(current_dir, children[i]); - if (!fs->Match(child_path, dirs[dir_index])) { - children_dir_status[i] = Status(tensorflow::error::CANCELLED, - "Operation not needed"); - } else if (dir_index != dirs.size() - 1) { - children_dir_status[i] = fs->IsDirectory(child_path); - } else { - children_dir_status[i] = - is_directory ? fs->IsDirectory(child_path) : Status::OK(); - } - }); - for (size_t i = 0; i < children.size(); ++i) { - const string child_path = io::JoinPath(current_dir, children[i]); - // If the IsDirectory call was cancelled we bail. - if (children_dir_status[i].code() == tensorflow::error::CANCELLED) { - continue; + return Status::OK(); + } + + // To expand the globbing, we do a BFS from `dirs[matching_index-1]`. + // At every step, we work on a pair `{dir, ix}` such that `dir` is a real + // directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern. + // To expand the pattern, we select from all the children of `dir` only those + // that match against `dirs[ix+1]`. + // If there are more entries in `dirs` after `dirs[ix+1]` this mean we have + // more patterns to match. So, we add to the queue only those children that + // are also directories, paired with `ix+1`. + // If there are no more entries in `dirs`, we return all children as part of + // the answer. + // Since we can get into a combinatorial explosion issue (e.g., pattern + // `/*/*/*`), we process the queue in parallel. Each parallel processing takes + // elements from `expand_queue` and adds them to `next_expand_queue`, after + // which we swap these two queues (similar to double buffering algorithms). + // PRECONDITION: `IsGlobbingPattern(dirs[0]) == false` + // PRECONDITION: `matching_index > 0` + // INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the + // same level in the filesystem tree. + // INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`. + // INVARIANT: If `{d, _}` is in queue, then `d` is a real directory. + // INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`. + // INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`. + std::deque> expand_queue; + std::deque> next_expand_queue; + expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1); + + // Adding to `result` or `new_expand_queue` need to be protected by mutexes + // since there are multiple threads writing to these. + mutex result_mutex; + mutex queue_mutex; + + while (!expand_queue.empty()) { + next_expand_queue.clear(); + + // The work item for every item in `expand_queue`. + // pattern, we process them in parallel. + auto handle_level = [&fs, &results, &dirs, &expand_queue, + &next_expand_queue, &result_mutex, + &queue_mutex](int i) { + // See invariants above, all of these are valid accesses. + const auto& queue_item = expand_queue.at(i); + const std::string& parent = queue_item.first; + const int index = queue_item.second + 1; + const std::string& match_pattern = dirs[index]; + + // Get all children of `parent`. If this fails, return early. + std::vector children; + Status s = fs->GetChildren(parent, &children); + if (s.code() == tensorflow::error::PERMISSION_DENIED) { + return; } - if (children_dir_status[i].ok()) { - if (dir_index != dirs.size() - 1) { - dir_q.push_back({child_path, dir_index}); + + // Also return early if we don't have any children + if (children.empty()) { + return; + } + + // Since we can get extremely many children here and on some filesystems + // `IsDirectory` is expensive, we process the children in parallel. + // We also check that children match the pattern in parallel, for speedup. + // We store the status of the match and `IsDirectory` in + // `children_status` array, one element for each children. + std::vector children_status(children.size()); + auto handle_children = [&fs, &match_pattern, &parent, &children, + &children_status](int j) { + const std::string path = io::JoinPath(parent, children[j]); + if (!fs->Match(path, match_pattern)) { + children_status[j] = + Status(tensorflow::error::CANCELLED, "Operation not needed"); } else { - results->push_back(child_path); + children_status[j] = fs->IsDirectory(path); + } + }; + ForEach(0, children.size(), handle_children); + + // At this point, pairing `children` with `children_status` will tell us + // if a children: + // * does not match the pattern + // * matches the pattern and is a directory + // * matches the pattern and is not a directory + // We fully ignore the first case. + // If we matched the last pattern (`index == dirs.size() - 1`) then all + // remaining children get added to the result. + // Otherwise, only the directories get added to the next queue. + for (size_t j = 0; j < children.size(); j++) { + if (children_status[j].code() == tensorflow::error::CANCELLED) { + continue; + } + + const std::string path = io::JoinPath(parent, children[j]); + if (index == dirs.size() - 1) { + mutex_lock l(result_mutex); + results->emplace_back(path); + } else if (children_status[j].ok()) { + mutex_lock l(queue_mutex); + next_expand_queue.emplace_back(path, index); } } - } + }; + ForEach(0, expand_queue.size(), handle_level); + + // After evaluating one level, swap the "buffers" + std::swap(expand_queue, next_expand_queue); } - return ret; + + return Status::OK(); } } // namespace internal