Completely rewrite GetMatchingPaths.

The current parallel implementation is too complex (lambda inside lambda, two levels of parallelism) and has a read outside of bounds issue.

The new implementation cleans up artifacts from the previous implementations that were left in the code as it evolves. We add multiple helper functions, and document invariants and preconditions as well as every major step. This way, we fix the security issue and a potential new one which was not caught before

PiperOrigin-RevId: 346146220
Change-Id: Iec0f44673f43349797bf9944dffe9b2f779137d8
This commit is contained in:
Mihai Maruseac 2020-12-07 11:57:01 -08:00
parent 5bf048973c
commit 18d54d1586

View File

@ -19,8 +19,10 @@ limitations under the License.
#include <string>
#include <vector>
#include "tensorflow/core/platform/cpu_info.h"
#include "tensorflow/core/platform/env.h"
#include "tensorflow/core/platform/file_system.h"
#include "tensorflow/core/platform/mutex.h"
#include "tensorflow/core/platform/path.h"
#include "tensorflow/core/platform/platform.h"
#include "tensorflow/core/platform/status.h"
@ -32,7 +34,7 @@ namespace internal {
namespace {
constexpr int kNumThreads = 8;
const int kNumThreads = port::NumSchedulableCPUs();
// Run a function in parallel using a ThreadPool, but skip the ThreadPool
// on the iOS platform due to its problems with more than a few threads.
@ -50,94 +52,217 @@ void ForEach(int first, int last, const std::function<void(int)>& f) {
#endif
}
// A globbing pattern can only start with these characters:
static const char kGlobbingChars[] = "*?[\\";
static inline bool IsGlobbingPattern(const std::string& pattern) {
return (pattern.find_first_of(kGlobbingChars) != std::string::npos);
}
// Make sure that the first entry in `dirs` during glob expansion does not
// contain a glob pattern. This is to prevent a corner-case bug where
// `<pattern>` would be treated differently than `./<pattern>`.
static std::string PatchPattern(const std::string& pattern) {
const std::string fixed_prefix =
pattern.substr(0, pattern.find_first_of(kGlobbingChars));
// Patching is needed when there is no directory part in `prefix`
if (io::Dirname(fixed_prefix).empty()) {
return io::JoinPath(".", pattern);
}
// No patching needed
return pattern;
}
static std::vector<std::string> AllDirectoryPrefixes(const std::string& d) {
std::vector<std::string> dirs;
const std::string patched = PatchPattern(d);
StringPiece dir(patched);
// If the pattern ends with a `/` (or `\\` on Windows), we need to strip it
// otherwise we would have one additional matching step and the result set
// would be empty.
bool is_directory = d[d.size() - 1] == '/';
#ifdef PLATFORM_WINDOWS
is_directory = is_directory || (d[d.size() - 1] == '\\');
#endif
if (is_directory) {
dir = io::Dirname(dir);
}
while (!dir.empty()) {
dirs.emplace_back(dir);
StringPiece new_dir(io::Dirname(dir));
// io::Dirname("/") returns "/" so we need to break the loop.
// On Windows, io::Dirname("C:\\") would return "C:\\", so we check for
// identity of the result instead of checking for dir[0] == `/`.
if (dir == new_dir) break;
dir = new_dir;
}
// Order the array from parent to ancestor (reverse order).
std::reverse(dirs.begin(), dirs.end());
return dirs;
}
static inline int GetFirstGlobbingEntry(const std::vector<std::string>& dirs) {
int i = 0;
for (const auto& d : dirs) {
if (IsGlobbingPattern(d)) {
break;
}
i++;
}
return i;
}
} // namespace
Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
std::vector<string>* results) {
// Check that `fs`, `env` and `results` are non-null.
if (fs == nullptr || env == nullptr || results == nullptr) {
return Status(tensorflow::error::INVALID_ARGUMENT,
"Filesystem calls GetMatchingPaths with nullptr arguments");
}
// By design, we don't match anything on empty pattern
results->clear();
if (pattern.empty()) {
return Status::OK();
}
string fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\"));
string eval_pattern = pattern;
string dir(io::Dirname(fixed_prefix));
// If dir is empty then we need to fix up fixed_prefix and eval_pattern to
// include . as the top level directory.
if (dir.empty()) {
dir = ".";
fixed_prefix = io::JoinPath(dir, fixed_prefix);
eval_pattern = io::JoinPath(dir, eval_pattern);
// The pattern can contain globbing characters at multiple levels, e.g.:
//
// foo/ba?/baz/f*r
//
// To match the full pattern, we must match every prefix subpattern and then
// operate on the children for each match. Thus, we separate all subpatterns
// in the `dirs` vector below.
std::vector<std::string> dirs = AllDirectoryPrefixes(pattern);
// We can have patterns that have several parents where no globbing is being
// done, for example, `foo/bar/baz/*`. We don't need to expand the directories
// which don't contain the globbing characters.
int matching_index = GetFirstGlobbingEntry(dirs);
// If we don't have globbing characters in the pattern then it specifies a
// path in the filesystem. We add it to the result set if it exists.
if (matching_index == dirs.size()) {
if (fs->FileExists(pattern).ok()) {
results->emplace_back(pattern);
}
bool is_directory = pattern[pattern.size() - 1] == '/';
#ifdef PLATFORM_WINDOWS
is_directory = is_directory || pattern[pattern.size() - 1] == '\\';
#endif
std::vector<string> dirs;
if (!is_directory) {
dirs.push_back(eval_pattern);
return Status::OK();
}
StringPiece tmp_dir(io::Dirname(eval_pattern));
while (tmp_dir.size() > dir.size()) {
dirs.push_back(string(tmp_dir));
tmp_dir = io::Dirname(tmp_dir);
}
dirs.push_back(dir);
std::reverse(dirs.begin(), dirs.end());
// Setup a BFS to explore everything under dir.
std::deque<std::pair<string, int>> dir_q;
dir_q.push_back({dirs[0], 0});
Status ret; // Status to return.
// children_dir_status holds is_dir status for children. It can have three
// possible values: OK for true; FAILED_PRECONDITION for false; CANCELLED
// if we don't calculate IsDirectory (we might do that because there isn't
// any point in exploring that child path).
std::vector<Status> children_dir_status;
while (!dir_q.empty()) {
string current_dir = dir_q.front().first;
int dir_index = dir_q.front().second;
dir_index++;
dir_q.pop_front();
std::vector<string> children;
Status s = fs->GetChildren(current_dir, &children);
// In case PERMISSION_DENIED is encountered, we bail here.
// To expand the globbing, we do a BFS from `dirs[matching_index-1]`.
// At every step, we work on a pair `{dir, ix}` such that `dir` is a real
// directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern.
// To expand the pattern, we select from all the children of `dir` only those
// that match against `dirs[ix+1]`.
// If there are more entries in `dirs` after `dirs[ix+1]` this mean we have
// more patterns to match. So, we add to the queue only those children that
// are also directories, paired with `ix+1`.
// If there are no more entries in `dirs`, we return all children as part of
// the answer.
// Since we can get into a combinatorial explosion issue (e.g., pattern
// `/*/*/*`), we process the queue in parallel. Each parallel processing takes
// elements from `expand_queue` and adds them to `next_expand_queue`, after
// which we swap these two queues (similar to double buffering algorithms).
// PRECONDITION: `IsGlobbingPattern(dirs[0]) == false`
// PRECONDITION: `matching_index > 0`
// INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the
// same level in the filesystem tree.
// INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`.
// INVARIANT: If `{d, _}` is in queue, then `d` is a real directory.
// INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`.
// INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`.
std::deque<std::pair<string, int>> expand_queue;
std::deque<std::pair<string, int>> next_expand_queue;
expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1);
// Adding to `result` or `new_expand_queue` need to be protected by mutexes
// since there are multiple threads writing to these.
mutex result_mutex;
mutex queue_mutex;
while (!expand_queue.empty()) {
next_expand_queue.clear();
// The work item for every item in `expand_queue`.
// pattern, we process them in parallel.
auto handle_level = [&fs, &results, &dirs, &expand_queue,
&next_expand_queue, &result_mutex,
&queue_mutex](int i) {
// See invariants above, all of these are valid accesses.
const auto& queue_item = expand_queue.at(i);
const std::string& parent = queue_item.first;
const int index = queue_item.second + 1;
const std::string& match_pattern = dirs[index];
// Get all children of `parent`. If this fails, return early.
std::vector<std::string> children;
Status s = fs->GetChildren(parent, &children);
if (s.code() == tensorflow::error::PERMISSION_DENIED) {
return;
}
// Also return early if we don't have any children
if (children.empty()) {
return;
}
// Since we can get extremely many children here and on some filesystems
// `IsDirectory` is expensive, we process the children in parallel.
// We also check that children match the pattern in parallel, for speedup.
// We store the status of the match and `IsDirectory` in
// `children_status` array, one element for each children.
std::vector<Status> children_status(children.size());
auto handle_children = [&fs, &match_pattern, &parent, &children,
&children_status](int j) {
const std::string path = io::JoinPath(parent, children[j]);
if (!fs->Match(path, match_pattern)) {
children_status[j] =
Status(tensorflow::error::CANCELLED, "Operation not needed");
} else {
children_status[j] = fs->IsDirectory(path);
}
};
ForEach(0, children.size(), handle_children);
// At this point, pairing `children` with `children_status` will tell us
// if a children:
// * does not match the pattern
// * matches the pattern and is a directory
// * matches the pattern and is not a directory
// We fully ignore the first case.
// If we matched the last pattern (`index == dirs.size() - 1`) then all
// remaining children get added to the result.
// Otherwise, only the directories get added to the next queue.
for (size_t j = 0; j < children.size(); j++) {
if (children_status[j].code() == tensorflow::error::CANCELLED) {
continue;
}
ret.Update(s);
if (children.empty()) continue;
// This IsDirectory call can be expensive for some FS. Parallelizing it.
children_dir_status.resize(children.size());
ForEach(0, children.size(),
[fs, &current_dir, &children, &dirs, dir_index, is_directory,
&children_dir_status](int i) {
const string child_path = io::JoinPath(current_dir, children[i]);
if (!fs->Match(child_path, dirs[dir_index])) {
children_dir_status[i] = Status(tensorflow::error::CANCELLED,
"Operation not needed");
} else if (dir_index != dirs.size() - 1) {
children_dir_status[i] = fs->IsDirectory(child_path);
} else {
children_dir_status[i] =
is_directory ? fs->IsDirectory(child_path) : Status::OK();
}
});
for (size_t i = 0; i < children.size(); ++i) {
const string child_path = io::JoinPath(current_dir, children[i]);
// If the IsDirectory call was cancelled we bail.
if (children_dir_status[i].code() == tensorflow::error::CANCELLED) {
continue;
}
if (children_dir_status[i].ok()) {
if (dir_index != dirs.size() - 1) {
dir_q.push_back({child_path, dir_index});
} else {
results->push_back(child_path);
const std::string path = io::JoinPath(parent, children[j]);
if (index == dirs.size() - 1) {
mutex_lock l(result_mutex);
results->emplace_back(path);
} else if (children_status[j].ok()) {
mutex_lock l(queue_mutex);
next_expand_queue.emplace_back(path, index);
}
}
};
ForEach(0, expand_queue.size(), handle_level);
// After evaluating one level, swap the "buffers"
std::swap(expand_queue, next_expand_queue);
}
}
return ret;
return Status::OK();
}
} // namespace internal